You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 02:00:29 UTC

[3/6] git commit: Added necessary configuration to run in backfill mode

Added necessary configuration to run in backfill mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e71b13fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e71b13fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e71b13fb

Branch: refs/heads/master
Commit: e71b13fb8424572feda9516df234168c0ca9a2d9
Parents: e5c95d6
Author: mfranklin <mf...@apache.org>
Authored: Mon May 26 10:53:35 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon May 26 20:00:13 2014 -0400

----------------------------------------------------------------------
 .../sysomos/provider/SysomosProvider.java       | 83 +++++++++++++++++---
 1 file changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e71b13fb/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index eaf9eac..15c34cb 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -19,7 +19,9 @@
 
 package org.apache.streams.sysomos.provider;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import com.sysomos.SysomosConfiguration;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
@@ -30,20 +32,32 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Streams Provider for the Sysomos Heartbeat API
+ *
+ * Configuration:
+ * The provider takes either a Map<String,Object> containing the mode (backfill and terminate OR continuous) and a
+ * Map<String,String> of heartbeat IDs to document target ids or a string of the format ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId}
+ * This configuration will configure the provider to backfill to the specified document and either terminate or not
+ * depending on the mode flag.  Continuous mode is assumed, and is the ony mode supported by the String configuration.
+ *
  */
 public class SysomosProvider implements StreamsProvider {
 
-    public final static String STREAMS_ID = "SysomosProvider";
+    public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
 
+    public static final String STREAMS_ID = "SysomosProvider";
+    public static final String MODE_KEY = "mode";
+    public static final String STARTING_DOCS_KEY = "startingDocs";
     public static final int LATENCY = 10000;  //Default minLatency for querying the Sysomos API in milliseconds
     public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
     public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
@@ -51,6 +65,7 @@ public class SysomosProvider implements StreamsProvider {
     protected volatile Queue<StreamsDatum> providerQueue;
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Set<String> completedHeartbeats = Sets.newHashSet();
     private final long maxQueued;
     private final long minLatency;
     private final long scheduledLatency;
@@ -59,6 +74,8 @@ public class SysomosProvider implements StreamsProvider {
     private SysomosClient client;
     private SysomosConfiguration config;
     private ScheduledExecutorService stream;
+    private Map<String, String> documentIds;
+    private Mode mode = Mode.CONTINUOUS;
     private boolean started = false;
 
     public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
@@ -78,6 +95,22 @@ public class SysomosProvider implements StreamsProvider {
         this.config = config;
     }
 
+    public Mode getMode() {
+        return mode;
+    }
+
+    public long getMinLatency() {
+        return minLatency;
+    }
+
+    public long getMaxApiBatch() {
+        return maxApiBatch;
+    }
+
+    public SysomosClient getClient() {
+        return client;
+    }
+
     @Override
     public void startStream() {
         LOGGER.trace("Starting Producer");
@@ -85,7 +118,9 @@ public class SysomosProvider implements StreamsProvider {
             LOGGER.trace("Producer not started.  Initializing");
             stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
             for (String heartbeatId : getConfig().getHeartbeatIds()) {
-                Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
+                Runnable task = documentIds != null && documentIds.containsKey(heartbeatId) ?
+                        new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)) :
+                        new SysomosHeartbeatStream(this, heartbeatId);
                 stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
                 LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
             }
@@ -121,6 +156,11 @@ public class SysomosProvider implements StreamsProvider {
     @Override
     public void prepare(Object configurationObject) {
         this.providerQueue = constructQueue();
+        if(configurationObject instanceof Map) {
+            extractConfigFromMap((Map) configurationObject);
+        } else if(configurationObject instanceof String) {
+            documentIds = Splitter.on(",").trimResults().withKeyValueSeparator(":").split((String)configurationObject);
+        }
     }
 
     @Override
@@ -143,16 +183,17 @@ public class SysomosProvider implements StreamsProvider {
         }
     }
 
-    public long getMinLatency() {
-        return minLatency;
-    }
-
-    public long getMaxApiBatch() {
-        return maxApiBatch;
-    }
+    public void signalComplete(String heartbeatId) {
+        try {
+            this.lock.writeLock().lock();
+            this.completedHeartbeats.add(heartbeatId);
+            if(completedHeartbeats.size() == this.getConfig().getHeartbeatIds().size()) {
+                this.cleanUp();
+            }
+        } finally {
+            this.lock.writeLock().unlock();
+        }
 
-    public SysomosClient getClient() {
-        return client;
     }
 
     protected void enqueueItem(StreamsDatum datum) {
@@ -185,6 +226,24 @@ public class SysomosProvider implements StreamsProvider {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    protected void extractConfigFromMap(Map configMap) {
+        if(configMap.containsKey(MODE_KEY)) {
+            Object configMode = configMap.get(MODE_KEY);
+            if(!(configMode instanceof Mode)) {
+                throw new IllegalStateException("Invalid configuration.  Mode must be an instance of the Mode enum but was " + configMode);
+            }
+            this.mode = (Mode)configMode;
+        }
+        if(configMap.containsKey(STARTING_DOCS_KEY)) {
+            Object configIds = configMap.get(MODE_KEY);
+            if(!(configIds instanceof Map)) {
+                throw new IllegalStateException("Invalid configuration.  StartingDocs must be an instance of Map<String,String> but was " + configIds);
+            }
+            this.documentIds = (Map)configIds;
+        }
+    }
+
     private Queue<StreamsDatum> constructQueue() {
         return Queues.newConcurrentLinkedQueue();
     }