You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/29 03:37:01 UTC

[08/11] git commit: Implemented provider

Implemented provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ac009075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ac009075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ac009075

Branch: refs/heads/master
Commit: ac009075aeb13af9d253cda88cc3448b54a2440b
Parents: 3b7fd5c
Author: mfranklin <mf...@apache.org>
Authored: Mon Apr 28 14:23:01 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Apr 28 14:23:01 2014 -0400

----------------------------------------------------------------------
 .../streams/sysomos/SysomosHeartbeatTask.java   |  51 ++++++++
 .../apache/streams/sysomos/SysomosProvider.java | 131 +++++++++++++------
 .../streams/sysomos/SysomosProviderTask.java    |  67 ----------
 .../com/sysomos/SysomosConfiguration.json       |  10 +-
 4 files changed, 148 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
new file mode 100644
index 0000000..6cb1345
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
+ */
+public class SysomosHeartbeatTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatTask.class);
+
+    private SysomosProvider provider;
+    private SysomosClient client;
+    private String heartbeatId;
+    private long maxApiBatch;
+    private long currentOffset = 0L;
+
+    public SysomosHeartbeatTask(SysomosProvider provider, SysomosClient client, String heartbeatId, long maxApiBatch) {
+        this.provider = provider;
+        this.client = client;
+        this.heartbeatId = heartbeatId;
+        this.maxApiBatch = maxApiBatch;
+    }
+
+    @Override
+    public void run() {
+
+        client = new SysomosClient(provider.getConfig().getApiKey());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
index 35c765c..cb647fd 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
@@ -19,9 +19,9 @@
 
 package org.apache.streams.sysomos;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.sysomos.SysomosConfiguration;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
@@ -29,46 +29,42 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
 import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * Wrapper for the Sysomos API.
+ * Streams Provider for the Sysomos Heartbeat API
  */
 public class SysomosProvider implements StreamsProvider {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+    public static final int LATENCY = 10000;  //Default latency for querying the Sysomos API in milliseconds
+    public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
+    public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
 
-    private SysomosConfiguration config;
+    protected volatile Queue<StreamsDatum> providerQueue;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final long maxQueued;
+    private final long latency;
+    private final long maxApiBatch;
 
-    private List<String> apiKeys;
-    private List<ExecutorService> tasks = new LinkedList<ExecutorService>();
+    private SysomosClient client;
+    private SysomosConfiguration config;
+    private ScheduledExecutorService stream;
     private boolean started = false;
 
     public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
-        this.apiKeys = Lists.newArrayList();
+        this.config = sysomosConfiguration;
+        this.client = new SysomosClient(sysomosConfiguration.getApiKey());
+        this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
+        this.latency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
+        this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
     }
 
-    public static final String BASE_URL_STRING = "http://api.sysomos.com/";
-    private static final String DATE_FORMAT_STRING = "yyyy-MM-dd'T'hh:mm:ssZ";
-    private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
-    private static Pattern _pattern = Pattern.compile("code: ([0-9]+)");
-
-    public static final int LATENCY = 10;
-
-    private String apiKey;
-
     public SysomosConfiguration getConfig() {
         return config;
     }
@@ -77,22 +73,16 @@ public class SysomosProvider implements StreamsProvider {
         this.config = config;
     }
 
-    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
-    SysomosProviderTask task;
-    ScheduledExecutorService service;
-
     @Override
     public void startStream() {
         LOGGER.trace("Starting Producer");
-        if(!started) {
+        if (!started) {
             LOGGER.trace("Producer not started.  Initializing");
-            service = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
-            for(String heartbeatId : getConfig().getHeartbeatIds()) {
-                task = new SysomosProviderTask(this, heartbeatId);
-                service.scheduleWithFixedDelay(task, 0, LATENCY, TimeUnit.SECONDS);
-                LOGGER.info("Started producer for {} with service {}", getConfig().getApiKey(), service.toString());
-                this.tasks.add(service);
+            stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
+            for (String heartbeatId : getConfig().getHeartbeatIds()) {
+                Runnable task = new SysomosHeartbeatTask(this, this.client, heartbeatId, this.maxApiBatch);
+                stream.scheduleWithFixedDelay(task, 0, this.latency, TimeUnit.MILLISECONDS);
+                LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
             }
             started = true;
         }
@@ -100,26 +90,83 @@ public class SysomosProvider implements StreamsProvider {
 
     @Override
     public StreamsResultSet readCurrent() {
-        return null;
+        StreamsResultSet current;
+        try {
+            lock.writeLock().lock();
+            current = new StreamsResultSet(providerQueue);
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return current;
     }
 
     @Override
     public StreamsResultSet readNew(BigInteger bigInteger) {
-        return null;
+        throw new NotImplementedException("readNew not currently implemented");
     }
 
     @Override
     public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        return null;
+        throw new NotImplementedException("readRange not currently implemented");
     }
 
     @Override
     public void prepare(Object configurationObject) {
-
+        //NOP
     }
 
     @Override
     public void cleanUp() {
+        stream.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+                stream.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+                    LOGGER.error("Stream did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            stream.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
 
+    protected void enqueueItem(StreamsDatum datum) {
+        boolean success;
+        do {
+            try {
+                pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
+                lock.readLock().lock();
+                success = providerQueue.offer(datum);
+                Thread.yield();
+            }finally {
+                lock.readLock().unlock();
+            }
+        }
+        while (!success);
     }
+
+    /**
+     * Wait for the queue size to be below threshold before allowing execution to continue on this thread
+     */
+    private void pauseForSpace() {
+        while(this.providerQueue.size() >= maxQueued) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread was interrupted", e);
+            }
+        }
+    }
+
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.newConcurrentLinkedQueue();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
deleted file mode 100644
index b12fb04..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.SysomosConfiguration;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Queue;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Wrapper for the Sysomos API.
- */
-public class SysomosProviderTask implements Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProviderTask.class);
-
-    private SysomosConfiguration config;
-
-    private SysomosProvider provider;
-
-    private SysomosClient client;
-
-    private String heartbeatId;
-
-    public SysomosProviderTask(SysomosProvider provider, String heartbeatId) {
-        this.provider = provider;
-        this.heartbeatId = heartbeatId;
-    }
-
-    @Override
-    public void run() {
-
-        client = new SysomosClient(provider.getConfig().getApiKey());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
index 7a39d19..cc0c4f7 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
+++ b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
@@ -20,10 +20,16 @@
             }
         },
         "minDelayMs": {
-            "type": "long"
+            "type": "String",
+			"format": "utc-millisec"
         },
         "maxBatchSize": {
-            "type": "long"
+            "type": "String",
+			"format": "utc-millisec"
+        },
+        "apiBatchSize": {
+            "type": "String",
+			"format": "utc-millisec"
         }
     }
 }
\ No newline at end of file