You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:05 UTC
[04/10] git commit: STREAMS-71 | Updated to handle multiple modes of
backfill
STREAMS-71 | Updated to handle multiple modes of backfill
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b9993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b9993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b9993
Branch: refs/heads/master
Commit: 409b9993e247ce685e9718a246d65c51f64117d5
Parents: 23fc011
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 14:22:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 14:22:18 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b9993/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c234cb1..c0bda15 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -44,6 +44,7 @@ public class SysomosHeartbeatStream implements Runnable {
private String lastID;
private DateTime beforeTime;
private DateTime afterTime;
+ private DateTime lastRunTime;
private int offsetCount = 0;
private boolean enabled = true;
@@ -76,6 +77,7 @@ public class SysomosHeartbeatStream implements Runnable {
public void run() {
QueryResult result;
String mostCurrentId = null;
+ lastRunTime = DateTime.now();
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
LOGGER.debug("Querying API to match last ID of {}", lastID);
@@ -85,17 +87,22 @@ public class SysomosHeartbeatStream implements Runnable {
if(offsetCount == 1) {
mostCurrentId = result.getCurrentId();
}
+ updateOffset(result);
sleep();
- } while (shouldBackfill(result));
- updateState(result, mostCurrentId);
+ } while (offsetCount > 0);
+ updateState(result, mostCurrentId);
LOGGER.debug("Completed current execution with a final docID of {}", lastID);
}
protected void updateState(QueryResult result, String mostCurrentId) {
- //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
- //found the specific ID
- lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+ if(OperatingMode.DOC_MATCH.equals(mode)) {
+ //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+ //found the specific ID
+ lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+ } else {
+ afterTime = lastRunTime;
+ }
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
shutdown();
@@ -103,8 +110,13 @@ public class SysomosHeartbeatStream implements Runnable {
}
}
- protected boolean shouldBackfill(QueryResult result) {
- return lastID != null && !result.isMatchedLastId();
+ protected void updateOffset(QueryResult result) {
+ if(OperatingMode.DOC_MATCH.equals(mode)) {
+ //Reset the offset iff we have found a match or this is the first execution
+ offsetCount = lastID == null || result.isMatchedLastId() ? 0 : offsetCount + 1;
+ } else {
+ offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
+ }
}
protected void sleep() {
@@ -121,6 +133,7 @@ public class SysomosHeartbeatStream implements Runnable {
String currentId = null;
boolean matched = false;
+ int responseSize = 0;
if(response != null) {
for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
String docId = beat.getDocid();
@@ -137,10 +150,9 @@ public class SysomosHeartbeatStream implements Runnable {
item.getMetadata().put("heartbeat", this.heartbeatId);
this.provider.enqueueItem(item);
}
- //Reset the offset iff we have found a match or this is the first execution
- offsetCount = lastID == null || matched ? 0 : offsetCount + 1;
+ responseSize = response.getCount();
}
- return new QueryResult(matched, currentId);
+ return new QueryResult(matched, currentId, responseSize);
}
protected BeatApi.BeatResponse executeAPIRequest() {
@@ -175,10 +187,13 @@ public class SysomosHeartbeatStream implements Runnable {
protected class QueryResult {
private boolean matchedLastId;
private String currentId;
+ private int responseSize;
- private QueryResult(boolean matchedLastId, String currentId) {
+
+ public QueryResult(boolean matchedLastId, String currentId, int responseSize) {
this.matchedLastId = matchedLastId;
this.currentId = currentId;
+ this.responseSize = responseSize;
}
public boolean isMatchedLastId() {
@@ -196,5 +211,13 @@ public class SysomosHeartbeatStream implements Runnable {
public void setCurrentId(String currentId) {
this.currentId = currentId;
}
+
+ public int getResponseSize() {
+ return responseSize;
+ }
+
+ public void setResponseSize(int responseSize) {
+ this.responseSize = responseSize;
+ }
}
}