You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:04 UTC

[03/10] git commit: STREAMS-71 | Updated organization for easier update of operation

STREAMS-71 | Updated organization for easier update of operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/23fc0119
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/23fc0119
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/23fc0119

Branch: refs/heads/master
Commit: 23fc01199933a97be8414826f91eca12174b3789
Parents: 7afc27c
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:08:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:08:31 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 73 ++++++++++++--------
 1 file changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/23fc0119/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index f6f4f29..c234cb1 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,24 +79,32 @@ public class SysomosHeartbeatStream implements Runnable {
         //Iff we are trying to get to a specific document ID, continue to query after minimum delay
         do {
             LOGGER.debug("Querying API to match last ID of {}", lastID);
-            result = executeAPIRequest();
+            result = queryAPI();
             //Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
             //Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
             if(offsetCount == 1) {
                 mostCurrentId = result.getCurrentId();
             }
             sleep();
-        } while (lastID != null && !result.isMatchedLastId());
+        } while (shouldBackfill(result));
+        updateState(result, mostCurrentId);
+
+        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+    }
+
+    protected void updateState(QueryResult result, String mostCurrentId) {
         //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
         //found the specific ID
         lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
 
         if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
-            provider.signalComplete(heartbeatId);
-            enabled = false;
+            shutdown();
             LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
         }
-        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+    }
+
+    protected boolean shouldBackfill(QueryResult result) {
+        return lastID != null && !result.isMatchedLastId();
     }
 
     protected void sleep() {
@@ -104,30 +112,12 @@ public class SysomosHeartbeatStream implements Runnable {
             Thread.sleep(this.minLatency);
         } catch (InterruptedException e) {
             LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+            shutdown();
         }
     }
 
-    protected QueryResult executeAPIRequest() {
-        BeatApi.BeatResponse response = null;
-        try {
-            if(enabled) {
-                RequestBuilder requestBuilder = this.client.createRequestBuilder()
-                        .setHeartBeatId(heartbeatId)
-                        .setOffset(offsetCount * maxApiBatch)
-                        .setReturnSetSize(maxApiBatch);
-                if(beforeTime != null) {
-                    requestBuilder.setAddedBeforeDate(beforeTime);
-                }
-                if(afterTime != null) {
-                    requestBuilder.setAddedAfterDate(afterTime);
-                }
-                response = requestBuilder.execute();
-
-                LOGGER.debug("Received {} results from API query", response.getCount());
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Error querying Sysomos API", e);
-        }
+    protected QueryResult queryAPI() {
+        BeatApi.BeatResponse response = executeAPIRequest();
 
         String currentId = null;
         boolean matched = false;
@@ -153,7 +143,36 @@ public class SysomosHeartbeatStream implements Runnable {
         return new QueryResult(matched, currentId);
     }
 
-    private class QueryResult {
+    protected BeatApi.BeatResponse executeAPIRequest() {
+        BeatApi.BeatResponse response = null;
+        try {
+            if(enabled) {
+                RequestBuilder requestBuilder = this.client.createRequestBuilder()
+                        .setHeartBeatId(heartbeatId)
+                        .setOffset(offsetCount * maxApiBatch)
+                        .setReturnSetSize(maxApiBatch);
+                if(beforeTime != null) {
+                    requestBuilder.setAddedBeforeDate(beforeTime);
+                }
+                if(afterTime != null) {
+                    requestBuilder.setAddedAfterDate(afterTime);
+                }
+                response = requestBuilder.execute();
+
+                LOGGER.debug("Received {} results from API query", response.getCount());
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Error querying Sysomos API", e);
+        }
+        return response;
+    }
+
+    protected void shutdown() {
+        provider.signalComplete(heartbeatId);
+        enabled = false;
+    }
+
+    protected class QueryResult {
         private boolean matchedLastId;
         private String currentId;