You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 02:00:30 UTC

[4/6] git commit: Updated stream to support backfill

Updated stream to support backfill


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2570fa76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2570fa76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2570fa76

Branch: refs/heads/master
Commit: 2570fa76c2f33af8fe6ddaa2cbee8ba68010c763
Parents: e71b13f
Author: mfranklin <mf...@apache.org>
Authored: Mon May 26 10:54:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon May 26 20:00:13 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2570fa76/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 9cd5898..2719fb2 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -37,11 +37,17 @@ public class SysomosHeartbeatStream implements Runnable {
     private final long maxApiBatch;
     private final long minLatency;
 
-    private String lastID = null;
+    private String lastID;
+    private boolean enabled = true;
 
     public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
+        this(provider, heartbeatId, null);
+    }
+
+    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String docId) {
         this.provider = provider;
         this.heartbeatId = heartbeatId;
+        this.lastID = docId;
 
         this.client = provider.getClient();
         this.maxApiBatch = provider.getMaxApiBatch();
@@ -60,6 +66,12 @@ public class SysomosHeartbeatStream implements Runnable {
         //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
         //found the specific ID
         lastID = result.getCurrentId();
+
+        if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
+            provider.signalComplete(heartbeatId);
+            enabled = false;
+            LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
+        }
         LOGGER.debug("Completed current execution with a final docID of {}", lastID);
     }
 
@@ -74,12 +86,14 @@ public class SysomosHeartbeatStream implements Runnable {
     protected QueryResult executeAPIRequest() {
         BeatApi.BeatResponse response = null;
         try {
-            response = this.client.createRequestBuilder()
-                    .setHeartBeatId(heartbeatId)
-                    .setOffset(0)
-                    .setReturnSetSize(maxApiBatch).execute();
+            if(enabled) {
+                response = this.client.createRequestBuilder()
+                        .setHeartBeatId(heartbeatId)
+                        .setOffset(0)
+                        .setReturnSetSize(maxApiBatch).execute();
 
-            LOGGER.debug("Received {} results from API query", response.getCount());
+                LOGGER.debug("Received {} results from API query", response.getCount());
+            }
         } catch (Exception e) {
             LOGGER.warn("Error querying Sysomos API", e);
         }