You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 02:00:30 UTC
[4/6] git commit: Updated stream to support backfill
Updated stream to support backfill
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2570fa76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2570fa76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2570fa76
Branch: refs/heads/master
Commit: 2570fa76c2f33af8fe6ddaa2cbee8ba68010c763
Parents: e71b13f
Author: mfranklin <mf...@apache.org>
Authored: Mon May 26 10:54:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon May 26 20:00:13 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 26 +++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2570fa76/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 9cd5898..2719fb2 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -37,11 +37,17 @@ public class SysomosHeartbeatStream implements Runnable {
private final long maxApiBatch;
private final long minLatency;
- private String lastID = null;
+ private String lastID;
+ private boolean enabled = true;
public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
+ this(provider, heartbeatId, null);
+ }
+
+ public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String docId) {
this.provider = provider;
this.heartbeatId = heartbeatId;
+ this.lastID = docId;
this.client = provider.getClient();
this.maxApiBatch = provider.getMaxApiBatch();
@@ -60,6 +66,12 @@ public class SysomosHeartbeatStream implements Runnable {
//Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
//found the specific ID
lastID = result.getCurrentId();
+
+ if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
+ provider.signalComplete(heartbeatId);
+ enabled = false;
+ LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
+ }
LOGGER.debug("Completed current execution with a final docID of {}", lastID);
}
@@ -74,12 +86,14 @@ public class SysomosHeartbeatStream implements Runnable {
protected QueryResult executeAPIRequest() {
BeatApi.BeatResponse response = null;
try {
- response = this.client.createRequestBuilder()
- .setHeartBeatId(heartbeatId)
- .setOffset(0)
- .setReturnSetSize(maxApiBatch).execute();
+ if(enabled) {
+ response = this.client.createRequestBuilder()
+ .setHeartBeatId(heartbeatId)
+ .setOffset(0)
+ .setReturnSetSize(maxApiBatch).execute();
- LOGGER.debug("Received {} results from API query", response.getCount());
+ LOGGER.debug("Received {} results from API query", response.getCount());
+ }
} catch (Exception e) {
LOGGER.warn("Error querying Sysomos API", e);
}