You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/09 01:38:18 UTC
[2/3] git commit: allow timeline provider to run as perpetual stream
allow timeline provider to run as perpetual stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/476748d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/476748d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/476748d4
Branch: refs/heads/STREAMS-115
Commit: 476748d4518ecfc960550e43bd3d25e312c731a4
Parents: c50ce91
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Jun 23 23:53:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 8 12:16:00 2014 -0500
----------------------------------------------------------------------
.../provider/TwitterTimelineProvider.java | 242 ++++++++++---------
.../provider/TwitterTimelineProviderTask.java | 54 ++---
2 files changed, 151 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f9d524..d3c4a51 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,20 +18,14 @@
package org.apache.streams.twitter.provider;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,18 +35,14 @@ import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public class TwitterTimelineProvider implements StreamsProvider, Serializable {
@@ -61,28 +51,28 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
public static final int MAX_NUMBER_WAITING = 10000;
- private TwitterStreamConfiguration config;
+ private TwitterUserInformationConfiguration config;
private Class klass;
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public TwitterStreamConfiguration getConfig() {
+ public TwitterUserInformationConfiguration getConfig() {
return config;
}
- public void setConfig(TwitterStreamConfiguration config) {
+ public void setConfig(TwitterUserInformationConfiguration config) {
this.config = config;
}
+ protected Iterator<Long[]> idsBatches;
+ protected Iterator<String[]> screenNameBatches;
+
protected volatile Queue<StreamsDatum> providerQueue;
protected int idsCount;
protected Twitter client;
- protected Iterator<Long> ids;
- ListenableFuture providerTaskComplete;
-
- protected ListeningExecutorService executor;
+ protected ExecutorService executor;
protected DateTime start;
protected DateTime end;
@@ -98,22 +88,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
- public TwitterTimelineProvider() {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config) {
+ public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
this.config = config;
}
- public TwitterTimelineProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- this.klass = klass;
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
+ public TwitterTimelineProvider(TwitterUserInformationConfiguration config, Class klass) {
this.config = config;
this.klass = klass;
}
@@ -125,101 +104,88 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
@Override
public void startStream() {
LOGGER.debug("{} startStream", STREAMS_ID);
- throw new org.apache.commons.lang.NotImplementedException();
- }
- protected void captureTimeline(long currentId) {
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
- Paging paging = new Paging(1, 200);
- List<Status> statuses = null;
- boolean KeepGoing = true;
- boolean hadFailure = false;
+ LOGGER.info("readCurrent");
- do
- {
- int keepTrying = 0;
+ while(idsBatches.hasNext())
+ loadBatch(idsBatches.next());
- // keep trying to load, give it 5 attempts.
- //This value was chosen because it seemed like a reasonable number of times
- //to retry capturing a timeline given the sorts of errors that could potentially
- //occur (network timeout/interruption, faulty client, etc.)
- while (keepTrying < 5)
- {
+ while(screenNameBatches.hasNext())
+ loadBatch(screenNameBatches.next());
- try
- {
- statuses = client.getUserTimeline(currentId, paging);
- for (Status tStat : statuses) {
- String json = TwitterObjectFactory.getRawJSON(tStat);
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- paging.setPage(paging.getPage() + 1);
-
- keepTrying = 10;
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
- }
- }
- }
- while (shouldContinuePulling(statuses));
+ executor.shutdown();
}
- private Map<Long, Long> userPullInfo;
-
protected boolean shouldContinuePulling(List<Status> statuses) {
return (statuses != null) && (statuses.size() > 0);
}
- private void sleep()
- {
- Thread.yield();
- try {
- // wait one tenth of a millisecond
- Thread.yield();
- Thread.sleep(1);
- Thread.yield();
- }
- catch(IllegalArgumentException e) {
- // passing in static values, this will never happen
- }
- catch(InterruptedException e) {
- // noOp, there must have been an issue sleeping
+ private void loadBatch(Long[] ids) {
+ Twitter client = getTwitterClient();
+ int keepTrying = 0;
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+ try
+ {
+ long[] toQuery = new long[ids.length];
+ for(int i = 0; i < ids.length; i++)
+ toQuery[i] = ids[i];
+
+ for (User tStat : client.lookupUsers(toQuery)) {
+
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+ executor.submit(providerTask);
+
+ }
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
}
- Thread.yield();
}
- public StreamsResultSet readCurrent() {
- LOGGER.debug("{} readCurrent", STREAMS_ID);
-
- Preconditions.checkArgument(ids.hasNext());
- StreamsResultSet result;
+ private void loadBatch(String[] ids) {
+ Twitter client = getTwitterClient();
+ int keepTrying = 0;
- StreamsResultSet current;
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+ try
+ {
+ for (User tStat : client.lookupUsers(ids)) {
- synchronized( TwitterTimelineProvider.class ) {
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+ executor.submit(providerTask);
- while( ids.hasNext() ) {
- Long currentId = ids.next();
- LOGGER.info("Provider Task Starting: {}", currentId);
- captureTimeline(currentId);
+ }
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
-
}
+ }
- LOGGER.info("Finished. Cleaning up...");
+ public StreamsResultSet readCurrent() {
LOGGER.info("Providing {} docs", providerQueue.size());
+ StreamsResultSet result;
+
try {
lock.writeLock().lock();
result = new StreamsResultSet(providerQueue);
@@ -228,8 +194,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
} finally {
lock.writeLock().unlock();
}
- running.set(false);
- LOGGER.info("Exiting");
+
+ if( providerQueue.isEmpty() && executor.isTerminated()) {
+ LOGGER.info("Finished. Cleaning up...");
+
+ running.set(false);
+
+ LOGGER.info("Exiting");
+ }
return result;
@@ -291,15 +263,55 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
Preconditions.checkNotNull(config.getOauth().getAccessToken());
Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getFollow());
+ Preconditions.checkNotNull(config.getInfo());
+
+ List<String> screenNames = new ArrayList<String>();
+ List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+ List<Long> ids = new ArrayList<Long>();
+ List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+ for(String s : config.getInfo()) {
+ if(s != null)
+ {
+ String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ ids.add(Long.parseLong(potentialScreenName));
+ } catch (Exception e) {
+ screenNames.add(potentialScreenName);
+ }
+
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+ if(ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<Long>();
+ }
+
+ if(screenNames.size() >= 100) {
+ // add the batch
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+ // reset the Ids
+ screenNames = new ArrayList<String>();
+ }
+ }
+ }
+
+
+ if(ids.size() > 0)
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
- idsCount = config.getFollow().size();
- ids = config.getFollow().iterator();
+ if(screenNames.size() > 0)
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
+ this.idsBatches = idsBatches.iterator();
+ this.screenNameBatches = screenNameBatches.iterator();
- client = getTwitterClient();
}
protected Twitter getTwitterClient()
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f295729..09969d9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,13 +18,11 @@
package org.apache.streams.twitter.provider;
-import org.joda.time.DateTime;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.json.DataObjectFactory;
+import twitter4j.*;
import java.util.List;
@@ -36,12 +34,12 @@ public class TwitterTimelineProviderTask implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
private TwitterTimelineProvider provider;
- private Twitter twitter;
+ private Twitter client;
private Long id;
public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
this.provider = provider;
- this.twitter = twitter;
+ this.client = twitter;
this.id = id;
}
@@ -58,47 +56,43 @@ public class TwitterTimelineProviderTask implements Runnable {
int keepTrying = 0;
// keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
+ //This value was chosen because it seemed like a reasonable number of times
+ //to retry capturing a timeline given the sorts of errors that could potentially
+ //occur (network timeout/interruption, faulty client, etc.)
+ while (keepTrying < 5)
{
try
{
- statuses = twitter.getUserTimeline(id, paging);
+ statuses = client.getUserTimeline(id, paging);
for (Status tStat : statuses)
{
- if( provider.start != null &&
- provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
- {
- // they hit the last date we wanted to collect
- // we can now exit early
- KeepGoing = false;
- }
- // emit the record
- String json = DataObjectFactory.getRawJSON(tStat);
-
- //provider.offer(json);
+ String json = TwitterObjectFactory.getRawJSON(tStat);
+ try {
+ provider.lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
+ } finally {
+ provider.lock.readLock().unlock();
+ }
}
-
paging.setPage(paging.getPage() + 1);
keepTrying = 10;
}
- catch(Exception e)
- {
- hadFailure = true;
- keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e);
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
}
}
- while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
-
- LOGGER.info("Provider Finished. Cleaning up...");
+ while (provider.shouldContinuePulling(statuses));
- LOGGER.info("Provider Exiting");
+ LOGGER.info(id + " Thread Finished");
}