You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 02:00:29 UTC
[3/6] git commit: Added necessary configuration to run in backfill
mode
Added necessary configuration to run in backfill mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e71b13fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e71b13fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e71b13fb
Branch: refs/heads/master
Commit: e71b13fb8424572feda9516df234168c0ca9a2d9
Parents: e5c95d6
Author: mfranklin <mf...@apache.org>
Authored: Mon May 26 10:53:35 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon May 26 20:00:13 2014 -0400
----------------------------------------------------------------------
.../sysomos/provider/SysomosProvider.java | 83 +++++++++++++++++---
1 file changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e71b13fb/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index eaf9eac..15c34cb 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -19,7 +19,9 @@
package org.apache.streams.sysomos.provider;
+import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import com.sysomos.SysomosConfiguration;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
@@ -30,20 +32,32 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Streams Provider for the Sysomos Heartbeat API
+ *
+ * Configuration:
+ * The provider takes either a Map<String,Object> containing the mode (backfill and terminate OR continuous) and a
+ * Map<String,String> of heartbeat IDs to document target ids or a string of the format ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId}
+ * This configuration will configure the provider to backfill to the specified document and either terminate or not
+ * depending on the mode flag. Continuous mode is assumed, and is the ony mode supported by the String configuration.
+ *
*/
public class SysomosProvider implements StreamsProvider {
- public final static String STREAMS_ID = "SysomosProvider";
+ public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
- private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+ public static final String STREAMS_ID = "SysomosProvider";
+ public static final String MODE_KEY = "mode";
+ public static final String STARTING_DOCS_KEY = "startingDocs";
public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds
public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
@@ -51,6 +65,7 @@ public class SysomosProvider implements StreamsProvider {
protected volatile Queue<StreamsDatum> providerQueue;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Set<String> completedHeartbeats = Sets.newHashSet();
private final long maxQueued;
private final long minLatency;
private final long scheduledLatency;
@@ -59,6 +74,8 @@ public class SysomosProvider implements StreamsProvider {
private SysomosClient client;
private SysomosConfiguration config;
private ScheduledExecutorService stream;
+ private Map<String, String> documentIds;
+ private Mode mode = Mode.CONTINUOUS;
private boolean started = false;
public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
@@ -78,6 +95,22 @@ public class SysomosProvider implements StreamsProvider {
this.config = config;
}
+ public Mode getMode() {
+ return mode;
+ }
+
+ public long getMinLatency() {
+ return minLatency;
+ }
+
+ public long getMaxApiBatch() {
+ return maxApiBatch;
+ }
+
+ public SysomosClient getClient() {
+ return client;
+ }
+
@Override
public void startStream() {
LOGGER.trace("Starting Producer");
@@ -85,7 +118,9 @@ public class SysomosProvider implements StreamsProvider {
LOGGER.trace("Producer not started. Initializing");
stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
for (String heartbeatId : getConfig().getHeartbeatIds()) {
- Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
+ Runnable task = documentIds != null && documentIds.containsKey(heartbeatId) ?
+ new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)) :
+ new SysomosHeartbeatStream(this, heartbeatId);
stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
}
@@ -121,6 +156,11 @@ public class SysomosProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
this.providerQueue = constructQueue();
+ if(configurationObject instanceof Map) {
+ extractConfigFromMap((Map) configurationObject);
+ } else if(configurationObject instanceof String) {
+ documentIds = Splitter.on(",").trimResults().withKeyValueSeparator(":").split((String)configurationObject);
+ }
}
@Override
@@ -143,16 +183,17 @@ public class SysomosProvider implements StreamsProvider {
}
}
- public long getMinLatency() {
- return minLatency;
- }
-
- public long getMaxApiBatch() {
- return maxApiBatch;
- }
+ public void signalComplete(String heartbeatId) {
+ try {
+ this.lock.writeLock().lock();
+ this.completedHeartbeats.add(heartbeatId);
+ if(completedHeartbeats.size() == this.getConfig().getHeartbeatIds().size()) {
+ this.cleanUp();
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
- public SysomosClient getClient() {
- return client;
}
protected void enqueueItem(StreamsDatum datum) {
@@ -185,6 +226,24 @@ public class SysomosProvider implements StreamsProvider {
}
}
+ @SuppressWarnings("unchecked")
+ protected void extractConfigFromMap(Map configMap) {
+ if(configMap.containsKey(MODE_KEY)) {
+ Object configMode = configMap.get(MODE_KEY);
+ if(!(configMode instanceof Mode)) {
+ throw new IllegalStateException("Invalid configuration. Mode must be an instance of the Mode enum but was " + configMode);
+ }
+ this.mode = (Mode)configMode;
+ }
+ if(configMap.containsKey(STARTING_DOCS_KEY)) {
+ Object configIds = configMap.get(MODE_KEY);
+ if(!(configIds instanceof Map)) {
+ throw new IllegalStateException("Invalid configuration. StartingDocs must be an instance of Map<String,String> but was " + configIds);
+ }
+ this.documentIds = (Map)configIds;
+ }
+ }
+
private Queue<StreamsDatum> constructQueue() {
return Queues.newConcurrentLinkedQueue();
}