You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:04 UTC
[03/10] git commit: STREAMS-71 | Updated organization for easier
update of operation
STREAMS-71 | Updated organization for easier update of operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/23fc0119
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/23fc0119
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/23fc0119
Branch: refs/heads/master
Commit: 23fc01199933a97be8414826f91eca12174b3789
Parents: 7afc27c
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:08:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:08:31 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 73 ++++++++++++--------
1 file changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/23fc0119/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index f6f4f29..c234cb1 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,24 +79,32 @@ public class SysomosHeartbeatStream implements Runnable {
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
LOGGER.debug("Querying API to match last ID of {}", lastID);
- result = executeAPIRequest();
+ result = queryAPI();
//Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
//Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
if(offsetCount == 1) {
mostCurrentId = result.getCurrentId();
}
sleep();
- } while (lastID != null && !result.isMatchedLastId());
+ } while (shouldBackfill(result));
+ updateState(result, mostCurrentId);
+
+ LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ }
+
+ protected void updateState(QueryResult result, String mostCurrentId) {
//Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
//found the specific ID
lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
- provider.signalComplete(heartbeatId);
- enabled = false;
+ shutdown();
LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
}
- LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ }
+
+ protected boolean shouldBackfill(QueryResult result) {
+ return lastID != null && !result.isMatchedLastId();
}
protected void sleep() {
@@ -104,30 +112,12 @@ public class SysomosHeartbeatStream implements Runnable {
Thread.sleep(this.minLatency);
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+ shutdown();
}
}
- protected QueryResult executeAPIRequest() {
- BeatApi.BeatResponse response = null;
- try {
- if(enabled) {
- RequestBuilder requestBuilder = this.client.createRequestBuilder()
- .setHeartBeatId(heartbeatId)
- .setOffset(offsetCount * maxApiBatch)
- .setReturnSetSize(maxApiBatch);
- if(beforeTime != null) {
- requestBuilder.setAddedBeforeDate(beforeTime);
- }
- if(afterTime != null) {
- requestBuilder.setAddedAfterDate(afterTime);
- }
- response = requestBuilder.execute();
-
- LOGGER.debug("Received {} results from API query", response.getCount());
- }
- } catch (Exception e) {
- LOGGER.warn("Error querying Sysomos API", e);
- }
+ protected QueryResult queryAPI() {
+ BeatApi.BeatResponse response = executeAPIRequest();
String currentId = null;
boolean matched = false;
@@ -153,7 +143,36 @@ public class SysomosHeartbeatStream implements Runnable {
return new QueryResult(matched, currentId);
}
- private class QueryResult {
+ protected BeatApi.BeatResponse executeAPIRequest() {
+ BeatApi.BeatResponse response = null;
+ try {
+ if(enabled) {
+ RequestBuilder requestBuilder = this.client.createRequestBuilder()
+ .setHeartBeatId(heartbeatId)
+ .setOffset(offsetCount * maxApiBatch)
+ .setReturnSetSize(maxApiBatch);
+ if(beforeTime != null) {
+ requestBuilder.setAddedBeforeDate(beforeTime);
+ }
+ if(afterTime != null) {
+ requestBuilder.setAddedAfterDate(afterTime);
+ }
+ response = requestBuilder.execute();
+
+ LOGGER.debug("Received {} results from API query", response.getCount());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error querying Sysomos API", e);
+ }
+ return response;
+ }
+
+ protected void shutdown() {
+ provider.signalComplete(heartbeatId);
+ enabled = false;
+ }
+
+ protected class QueryResult {
private boolean matchedLastId;
private String currentId;