You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/30 03:11:50 UTC
[1/2] git commit: STREAMS-135 | Refactored provider to be reactive
and not spin lock
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-135 [created] 28406d9c7
STREAMS-135 | Refactored provider to be reactive and not spin lock
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ace69cf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ace69cf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ace69cf3
Branch: refs/heads/STREAMS-135
Commit: ace69cf34f4561429b7ed3ab2402bc956bebe25e
Parents: cbfe01a
Author: mfranklin <mf...@apache.org>
Authored: Tue Jul 29 18:04:26 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Jul 29 18:04:26 2014 -0400
----------------------------------------------------------------------
.../processor/TwitterEventProcessor.java | 56 ++++++---------
.../provider/TwitterStreamProcessor.java | 73 +++++++++++++++++++
.../twitter/provider/TwitterStreamProvider.java | 75 ++++++++++++--------
.../provider/TwitterStreamProviderTask.java | 71 ------------------
.../org/apache/streams/util/ComponentUtils.java | 32 +++++++--
5 files changed, 165 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 1444705..3a42af9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -44,11 +44,12 @@ import java.io.IOException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterEventProcessor implements StreamsProcessor, Runnable {
+public class TwitterEventProcessor implements StreamsProcessor, Callable<List<StreamsDatum>> {
private final static String STREAMS_ID = "TwitterEventProcessor";
@@ -56,55 +57,38 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
private ObjectMapper mapper = new StreamsTwitterMapper();
- private BlockingQueue<String> inQueue;
- private Queue<StreamsDatum> outQueue;
-
private Class inClass;
private Class outClass;
+ private String item;
private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
- public final static String TERMINATE = new String("TERMINATE");
-
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
+ public TwitterEventProcessor(Class inClass, Class outClass, String item) {
this.inClass = inClass;
this.outClass = outClass;
+ this.item = item;
}
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
- this.outClass = outClass;
+ public TwitterEventProcessor(Class inClass, Class outClass) {
+ this(inClass, outClass, null);
}
- public void run() {
-
- while(true) {
- String item;
- try {
-
- item = ComponentUtils.pollUntilStringNotEmpty(inQueue);
-
- if(item instanceof String && item.equals(TERMINATE)) {
- LOGGER.info("Terminating!");
- break;
- }
-
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
- for (StreamsDatum entry : process(rawDatum)) {
- ComponentUtils.offerUntilSuccess(entry, outQueue);
- }
+ public TwitterEventProcessor( Class outClass) {
+ this(null, outClass, null);
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ public TwitterEventProcessor( Class outClass, String item) {
+ this(null, outClass, item);
+ }
- }
+ @Override
+ public List<StreamsDatum> call() throws Exception {
+ if(item != null) {
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
+ return process(rawDatum);
}
+ return Lists.newArrayList();
}
public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
new file mode 100644
index 0000000..ec9ecc6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TwitterStreamProcessor extends StringDelimitedProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
+ private static final int DEFAULT_POOL_SIZE = 5;
+
+ private final TwitterStreamProvider provider;
+ private final ExecutorService service;
+
+ public TwitterStreamProcessor(TwitterStreamProvider provider) {
+ this(provider, DEFAULT_POOL_SIZE);
+ }
+
+ public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) {
+ //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
+ super(null);
+ service = Executors.newFixedThreadPool(poolSize);
+ this.provider = provider;
+ }
+
+
+ @Override
+ public boolean process() throws IOException, InterruptedException {
+ String msg = null;
+ do {
+ msg = this.processNextMessage();
+ Thread.sleep(10);
+ } while(msg == null);
+
+ return provider.addDatum(service.submit(new TwitterEventProcessor(String.class, msg)));
+ }
+
+ public void cleanUp() {
+ ComponentUtils.shutdownExecutor(service, 1, 30);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index f7c438c..2e82ea2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -20,15 +20,13 @@ package org.apache.streams.twitter.provider;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.*;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.BasicAuth;
@@ -38,7 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +46,7 @@ import java.math.BigInteger;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by sblackmon on 12/10/13.
@@ -68,17 +67,14 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
this.config = config;
}
- protected BlockingQueue<String> hosebirdQueue;
-
- protected volatile Queue<StreamsDatum> providerQueue;
+ protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
protected Hosts hosebirdHosts;
protected Authentication auth;
protected StreamingEndpoint endpoint;
protected BasicClient client;
-
- protected ListeningExecutorService executor;
-
+ protected AtomicBoolean running = new AtomicBoolean(false);
+ protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
private DatumStatusCounter countersCurrent = new DatumStatusCounter();
private DatumStatusCounter countersTotal = new DatumStatusCounter();
@@ -99,13 +95,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
@Override
public void startStream() {
-
- for (int i = 0; i < 5; i++) {
- executor.submit(new TwitterEventProcessor(hosebirdQueue, providerQueue, String.class));
- }
-
- new Thread(new TwitterStreamProviderTask(this)).start();
-
+ client.connect();
+ running.set(true);
}
@Override
@@ -114,12 +105,13 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
StreamsResultSet current;
synchronized( TwitterStreamProvider.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+ drainTo(drain);
+ current = new StreamsResultSet(drain);
current.setCounter(new DatumStatusCounter());
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
countersCurrent = new DatumStatusCounter();
- providerQueue.clear();
}
return current;
@@ -131,21 +123,18 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
}
@Override
- public StreamsResultSet readRange(DateTime start, DateTime end)
- {
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
throw new NotImplementedException();
}
@Override
public boolean isRunning() {
- return !executor.isShutdown() && !executor.isTerminated();
+ return this.running.get() && !client.isDone();
}
@Override
public void prepare(Object o) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
Preconditions.checkNotNull(config.getEndpoint());
if(config.getEndpoint().equals("userstream") ) {
@@ -217,8 +206,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
- hosebirdQueue = new LinkedBlockingQueue<String>(1000);
- providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+ providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(1000);
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
@@ -226,20 +214,47 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
.endpoint(endpoint)
.authentication(auth)
.connectionTimeout(1200000)
- .processor(new StringDelimitedProcessor(hosebirdQueue))
+ .processor(processor)
.build();
}
@Override
public void cleanUp() {
- for (int i = 0; i < 5; i++) {
- hosebirdQueue.add(TwitterEventProcessor.TERMINATE);
- }
+ this.client.stop();
+ this.processor.cleanUp();
+ this.running.set(true);
}
@Override
public DatumStatusCounter getDatumStatusCounter() {
return countersTotal;
}
+
+ protected boolean addDatum(Future<List<StreamsDatum>> future) {
+ ComponentUtils.offerUntilSuccess(future, providerQueue);
+ return true;
+ }
+
+ protected void drainTo(Queue<StreamsDatum> drain) {
+ while(!providerQueue.isEmpty()) {
+ for(StreamsDatum datum : pollForDatum()) {
+ ComponentUtils.offerUntilSuccess(datum, drain);
+ }
+ }
+ }
+
+ protected List<StreamsDatum> pollForDatum() {
+ try {
+ return providerQueue.poll().get();
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for future. Initiate shutdown.");
+ this.cleanUp();
+ Thread.currentThread().interrupt();
+ return Lists.newArrayList();
+ } catch (ExecutionException e) {
+ LOGGER.warn("Error getting tweet from future");
+ return Lists.newArrayList();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
deleted file mode 100644
index 13c5d21..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterStreamProviderTask implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProviderTask.class);
-
- private TwitterStreamProvider provider;
-
- public TwitterStreamProviderTask(TwitterStreamProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public void run() {
-
- provider.client.connect();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 6dd6cc7..f6a0b60 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -19,21 +19,24 @@
package org.apache.streams.util;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
/**
- * Created by sblackmon on 3/31/14.
+ * Common utilities for Streams components.
*/
public class ComponentUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
- public static void offerUntilSuccess(Object entry, Queue queue) {
+ public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
boolean success;
do {
- synchronized( ComponentUtils.class ) {
- success = queue.offer(entry);
- }
+ success = queue.offer(entry);
Thread.yield();
}
while( !success );
@@ -55,4 +58,23 @@ public class ComponentUtils {
return result;
}
+ public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
+ stream.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
+ stream.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
+ LOGGER.error("Executor Service did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ stream.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
[2/2] git commit: STREAMS-135 | Adjusted checks for config so that
the sample endpoint can be used
Posted by mf...@apache.org.
STREAMS-135 | Adjusted checks for config so that the sample endpoint can be used
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/28406d9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/28406d9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/28406d9c
Branch: refs/heads/STREAMS-135
Commit: 28406d9c732e4871096606b95b71f86261cff31a
Parents: ace69cf
Author: mfranklin <mf...@apache.org>
Authored: Tue Jul 29 19:46:23 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Jul 29 19:46:23 2014 -0400
----------------------------------------------------------------------
.../twitter/provider/TwitterStreamProvider.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/28406d9c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 2e82ea2..9e1ce45 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -151,17 +151,17 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
- Optional<List<String>> track = Optional.fromNullable(config.getTrack());
- Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
+ boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
+ boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
- if( track.isPresent() || follow.isPresent() ) {
+ if( track || follow ) {
LOGGER.debug("***\tPRESENT\t***");
StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
- if( track.isPresent() ) {
- statusesFilterEndpoint.trackTerms(track.get());
+ if( track ) {
+ statusesFilterEndpoint.trackTerms(config.getTrack());
}
- else {
- statusesFilterEndpoint.followings(follow.get());
+ if( follow ) {
+ statusesFilterEndpoint.followings(config.getFollow());
}
this.endpoint = statusesFilterEndpoint;
} else {