You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/29 03:37:01 UTC
[08/11] git commit: Implemented provider
Implemented provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ac009075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ac009075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ac009075
Branch: refs/heads/master
Commit: ac009075aeb13af9d253cda88cc3448b54a2440b
Parents: 3b7fd5c
Author: mfranklin <mf...@apache.org>
Authored: Mon Apr 28 14:23:01 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Apr 28 14:23:01 2014 -0400
----------------------------------------------------------------------
.../streams/sysomos/SysomosHeartbeatTask.java | 51 ++++++++
.../apache/streams/sysomos/SysomosProvider.java | 131 +++++++++++++------
.../streams/sysomos/SysomosProviderTask.java | 67 ----------
.../com/sysomos/SysomosConfiguration.json | 10 +-
4 files changed, 148 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
new file mode 100644
index 0000000..6cb1345
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
+ */
+public class SysomosHeartbeatTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatTask.class);
+
+ private SysomosProvider provider;
+ private SysomosClient client;
+ private String heartbeatId;
+ private long maxApiBatch;
+ private long currentOffset = 0L;
+
+ public SysomosHeartbeatTask(SysomosProvider provider, SysomosClient client, String heartbeatId, long maxApiBatch) {
+ this.provider = provider;
+ this.client = client;
+ this.heartbeatId = heartbeatId;
+ this.maxApiBatch = maxApiBatch;
+ }
+
+ @Override
+ public void run() {
+
+ client = new SysomosClient(provider.getConfig().getApiKey());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
index 35c765c..cb647fd 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
@@ -19,9 +19,9 @@
package org.apache.streams.sysomos;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
import com.sysomos.SysomosConfiguration;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
@@ -29,46 +29,42 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * Wrapper for the Sysomos API.
+ * Streams Provider for the Sysomos Heartbeat API
*/
public class SysomosProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+ public static final int LATENCY = 10000; //Default latency for querying the Sysomos API in milliseconds
+ public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
+ public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
- private SysomosConfiguration config;
+ protected volatile Queue<StreamsDatum> providerQueue;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final long maxQueued;
+ private final long latency;
+ private final long maxApiBatch;
- private List<String> apiKeys;
- private List<ExecutorService> tasks = new LinkedList<ExecutorService>();
+ private SysomosClient client;
+ private SysomosConfiguration config;
+ private ScheduledExecutorService stream;
private boolean started = false;
public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
- this.apiKeys = Lists.newArrayList();
+ this.config = sysomosConfiguration;
+ this.client = new SysomosClient(sysomosConfiguration.getApiKey());
+ this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
+ this.latency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
+ this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
}
- public static final String BASE_URL_STRING = "http://api.sysomos.com/";
- private static final String DATE_FORMAT_STRING = "yyyy-MM-dd'T'hh:mm:ssZ";
- private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
- private static Pattern _pattern = Pattern.compile("code: ([0-9]+)");
-
- public static final int LATENCY = 10;
-
- private String apiKey;
-
public SysomosConfiguration getConfig() {
return config;
}
@@ -77,22 +73,16 @@ public class SysomosProvider implements StreamsProvider {
this.config = config;
}
- protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
- SysomosProviderTask task;
- ScheduledExecutorService service;
-
@Override
public void startStream() {
LOGGER.trace("Starting Producer");
- if(!started) {
+ if (!started) {
LOGGER.trace("Producer not started. Initializing");
- service = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
- for(String heartbeatId : getConfig().getHeartbeatIds()) {
- task = new SysomosProviderTask(this, heartbeatId);
- service.scheduleWithFixedDelay(task, 0, LATENCY, TimeUnit.SECONDS);
- LOGGER.info("Started producer for {} with service {}", getConfig().getApiKey(), service.toString());
- this.tasks.add(service);
+ stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
+ for (String heartbeatId : getConfig().getHeartbeatIds()) {
+ Runnable task = new SysomosHeartbeatTask(this, this.client, heartbeatId, this.maxApiBatch);
+ stream.scheduleWithFixedDelay(task, 0, this.latency, TimeUnit.MILLISECONDS);
+ LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
}
started = true;
}
@@ -100,26 +90,83 @@ public class SysomosProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
- return null;
+ StreamsResultSet current;
+ try {
+ lock.writeLock().lock();
+ current = new StreamsResultSet(providerQueue);
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return current;
}
@Override
public StreamsResultSet readNew(BigInteger bigInteger) {
- return null;
+ throw new NotImplementedException("readNew not currently implemented");
}
@Override
public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
- return null;
+ throw new NotImplementedException("readRange not currently implemented");
}
@Override
public void prepare(Object configurationObject) {
-
+ //NOP
}
@Override
public void cleanUp() {
+ stream.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+ stream.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ stream.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ protected void enqueueItem(StreamsDatum datum) {
+ boolean success;
+ do {
+ try {
+ pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
+ lock.readLock().lock();
+ success = providerQueue.offer(datum);
+ Thread.yield();
+ }finally {
+ lock.readLock().unlock();
+ }
+ }
+ while (!success);
}
+
+ /**
+ * Wait for the queue size to be below threshold before allowing execution to continue on this thread
+ */
+ private void pauseForSpace() {
+ while(this.providerQueue.size() >= maxQueued) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread was interrupted", e);
+ }
+ }
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.newConcurrentLinkedQueue();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
deleted file mode 100644
index b12fb04..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProviderTask.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.SysomosConfiguration;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Queue;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Wrapper for the Sysomos API.
- */
-public class SysomosProviderTask implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProviderTask.class);
-
- private SysomosConfiguration config;
-
- private SysomosProvider provider;
-
- private SysomosClient client;
-
- private String heartbeatId;
-
- public SysomosProviderTask(SysomosProvider provider, String heartbeatId) {
- this.provider = provider;
- this.heartbeatId = heartbeatId;
- }
-
- @Override
- public void run() {
-
- client = new SysomosClient(provider.getConfig().getApiKey());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac009075/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
index 7a39d19..cc0c4f7 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
+++ b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json
@@ -20,10 +20,16 @@
}
},
"minDelayMs": {
- "type": "long"
+ "type": "String",
+ "format": "utc-millisec"
},
"maxBatchSize": {
- "type": "long"
+ "type": "String",
+ "format": "utc-millisec"
+ },
+ "apiBatchSize": {
+ "type": "String",
+ "format": "utc-millisec"
}
}
}
\ No newline at end of file