You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:05 UTC

[04/10] git commit: STREAMS-71 | Updated to handle multiple modes of backfill

STREAMS-71 | Updated to handle multiple modes of backfill


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b9993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b9993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b9993

Branch: refs/heads/master
Commit: 409b9993e247ce685e9718a246d65c51f64117d5
Parents: 23fc011
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 14:22:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 14:22:18 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b9993/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c234cb1..c0bda15 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -44,6 +44,7 @@ public class SysomosHeartbeatStream implements Runnable {
     private String lastID;
     private DateTime beforeTime;
     private DateTime afterTime;
+    private DateTime lastRunTime;
     private int offsetCount = 0;
     private boolean enabled = true;
 
@@ -76,6 +77,7 @@ public class SysomosHeartbeatStream implements Runnable {
     public void run() {
         QueryResult result;
         String mostCurrentId = null;
+        lastRunTime = DateTime.now();
         //Iff we are trying to get to a specific document ID, continue to query after minimum delay
         do {
             LOGGER.debug("Querying API to match last ID of {}", lastID);
@@ -85,17 +87,22 @@ public class SysomosHeartbeatStream implements Runnable {
             if(offsetCount == 1) {
                 mostCurrentId = result.getCurrentId();
             }
+            updateOffset(result);
             sleep();
-        } while (shouldBackfill(result));
-        updateState(result, mostCurrentId);
+        } while (offsetCount > 0);
 
+        updateState(result, mostCurrentId);
         LOGGER.debug("Completed current execution with a final docID of {}", lastID);
     }
 
     protected void updateState(QueryResult result, String mostCurrentId) {
-        //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
-        //found the specific ID
-        lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+        if(OperatingMode.DOC_MATCH.equals(mode)) {
+            //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+            //found the specific ID
+            lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+        } else {
+            afterTime = lastRunTime;
+        }
 
         if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
             shutdown();
@@ -103,8 +110,13 @@ public class SysomosHeartbeatStream implements Runnable {
         }
     }
 
-    protected boolean shouldBackfill(QueryResult result) {
-        return lastID != null && !result.isMatchedLastId();
+    protected void updateOffset(QueryResult result) {
+        if(OperatingMode.DOC_MATCH.equals(mode)) {
+            //Reset the offset iff we have found a match or this is the first execution
+            offsetCount = lastID == null || result.isMatchedLastId() ? 0 : offsetCount + 1;
+        } else {
+            offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
+        }
     }
 
     protected void sleep() {
@@ -121,6 +133,7 @@ public class SysomosHeartbeatStream implements Runnable {
 
         String currentId = null;
         boolean matched = false;
+        int responseSize = 0;
         if(response != null) {
             for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
                 String docId = beat.getDocid();
@@ -137,10 +150,9 @@ public class SysomosHeartbeatStream implements Runnable {
                 item.getMetadata().put("heartbeat", this.heartbeatId);
                 this.provider.enqueueItem(item);
             }
-            //Reset the offset iff we have found a match or this is the first execution
-            offsetCount = lastID == null || matched ? 0 : offsetCount + 1;
+            responseSize = response.getCount();
         }
-        return new QueryResult(matched, currentId);
+        return new QueryResult(matched, currentId, responseSize);
     }
 
     protected BeatApi.BeatResponse executeAPIRequest() {
@@ -175,10 +187,13 @@ public class SysomosHeartbeatStream implements Runnable {
     protected class QueryResult {
         private boolean matchedLastId;
         private String currentId;
+        private int responseSize;
 
-        private QueryResult(boolean matchedLastId, String currentId) {
+
+        public QueryResult(boolean matchedLastId, String currentId, int responseSize) {
             this.matchedLastId = matchedLastId;
             this.currentId = currentId;
+            this.responseSize = responseSize;
         }
 
         public boolean isMatchedLastId() {
@@ -196,5 +211,13 @@ public class SysomosHeartbeatStream implements Runnable {
         public void setCurrentId(String currentId) {
             this.currentId = currentId;
         }
+
+        public int getResponseSize() {
+            return responseSize;
+        }
+
+        public void setResponseSize(int responseSize) {
+            this.responseSize = responseSize;
+        }
     }
 }