You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/09 01:38:17 UTC
[1/3] git commit: sleep for nonzero mills
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-115 [created] b7a65e1ea
sleep for nonzero mills
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8b128409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8b128409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8b128409
Branch: refs/heads/STREAMS-115
Commit: 8b12840903eeb9563381d2e0962d83e118034181
Parents: 476748d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Jun 26 18:49:57 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 8 12:16:00 2014 -0500
----------------------------------------------------------------------
.../streams/twitter/provider/TwitterErrorHandler.java | 2 +-
.../twitter/provider/TwitterTimelineProvider.java | 14 +++++---------
2 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8b128409/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index f7af7f5..1f00129 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -30,7 +30,7 @@ public class TwitterErrorHandler
{
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
- protected static final long initial_backoff = 1000;
+ protected static final long initial_backoff = 2*60*1000;
protected static long backoff = initial_backoff;
public static int handleTwitterError(Twitter twitter, Exception exception)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8b128409/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index d3c4a51..73ac03e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -20,7 +20,6 @@ package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -82,10 +81,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Boolean jsonStoreEnabled;
Boolean includeEntitiesEnabled;
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ private static ExecutorService getExecutor() {
+ return Executors.newSingleThreadExecutor();
}
public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
@@ -244,11 +241,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
}
}
-
@Override
public void prepare(Object o) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ executor = getExecutor();
running.set(true);
try {
lock.writeLock().lock();
@@ -323,8 +319,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
.setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
.setOAuthAccessToken(config.getOauth().getAccessToken())
.setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
+ .setIncludeEntitiesEnabled(true)
+ .setJSONStoreEnabled(true)
.setAsyncNumThreads(3)
.setRestBaseURL(baseUrl)
.setIncludeMyRetweetEnabled(Boolean.TRUE)
[3/3] git commit: better comments and more optimal retry logic
Posted by sb...@apache.org.
better comments and more optimal retry logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7a65e1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7a65e1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7a65e1e
Branch: refs/heads/STREAMS-115
Commit: b7a65e1eaa9f523e3675361d9acde63906afac10
Parents: 8b12840
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 8 18:37:58 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 8 18:37:58 2014 -0500
----------------------------------------------------------------------
.../streams/twitter/provider/TwitterErrorHandler.java | 10 +++++-----
.../streams/twitter/provider/TwitterTimelineProvider.java | 3 +++
.../twitter/provider/TwitterTimelineProviderTask.java | 2 +-
3 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7a65e1e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 1f00129..3021701 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -24,14 +24,14 @@ import twitter4j.Twitter;
import twitter4j.TwitterException;
/**
- * Created by steveblackmon on 2/8/14.
+ * Handle expected and unexpected exceptions.
*/
public class TwitterErrorHandler
{
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
- protected static final long initial_backoff = 2*60*1000;
- protected static long backoff = initial_backoff;
+ // selected because 3 * 5 + n >= 15 for positive n
+ protected static final long retry = 3*60*1000;
public static int handleTwitterError(Twitter twitter, Exception exception)
{
@@ -42,7 +42,7 @@ public class TwitterErrorHandler
{
LOGGER.warn("Rate Limit Exceeded");
try {
- Thread.sleep(backoff *= 2);
+ Thread.sleep(retry);
} catch (InterruptedException e1) {}
return 1;
}
@@ -51,7 +51,7 @@ public class TwitterErrorHandler
LOGGER.info("Twitter Network Issues Detected. Backing off...");
LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage());
try {
- Thread.sleep(backoff *= 2);
+ Thread.sleep(retry);
} catch (InterruptedException e1) {}
return 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7a65e1e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 73ac03e..538e789 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -43,6 +43,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+/**
+ * Retrieve recent posts from a list of user ids or names.
+ */
public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public final static String STREAMS_ID = "TwitterTimelineProvider";
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7a65e1e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 09969d9..22e129e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -27,7 +27,7 @@ import twitter4j.*;
import java.util.List;
/**
- * Created by sblackmon on 12/10/13.
+ * Retrieve recent posts for a single user id.
*/
public class TwitterTimelineProviderTask implements Runnable {
[2/3] git commit: allow timeline provider to run as perpetual stream
Posted by sb...@apache.org.
allow timeline provider to run as perpetual stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/476748d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/476748d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/476748d4
Branch: refs/heads/STREAMS-115
Commit: 476748d4518ecfc960550e43bd3d25e312c731a4
Parents: c50ce91
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Jun 23 23:53:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 8 12:16:00 2014 -0500
----------------------------------------------------------------------
.../provider/TwitterTimelineProvider.java | 242 ++++++++++---------
.../provider/TwitterTimelineProviderTask.java | 54 ++---
2 files changed, 151 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f9d524..d3c4a51 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,20 +18,14 @@
package org.apache.streams.twitter.provider;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,18 +35,14 @@ import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public class TwitterTimelineProvider implements StreamsProvider, Serializable {
@@ -61,28 +51,28 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
public static final int MAX_NUMBER_WAITING = 10000;
- private TwitterStreamConfiguration config;
+ private TwitterUserInformationConfiguration config;
private Class klass;
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public TwitterStreamConfiguration getConfig() {
+ public TwitterUserInformationConfiguration getConfig() {
return config;
}
- public void setConfig(TwitterStreamConfiguration config) {
+ public void setConfig(TwitterUserInformationConfiguration config) {
this.config = config;
}
+ protected Iterator<Long[]> idsBatches;
+ protected Iterator<String[]> screenNameBatches;
+
protected volatile Queue<StreamsDatum> providerQueue;
protected int idsCount;
protected Twitter client;
- protected Iterator<Long> ids;
- ListenableFuture providerTaskComplete;
-
- protected ListeningExecutorService executor;
+ protected ExecutorService executor;
protected DateTime start;
protected DateTime end;
@@ -98,22 +88,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
- public TwitterTimelineProvider() {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config) {
+ public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
this.config = config;
}
- public TwitterTimelineProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- this.klass = klass;
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
+ public TwitterTimelineProvider(TwitterUserInformationConfiguration config, Class klass) {
this.config = config;
this.klass = klass;
}
@@ -125,101 +104,88 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
@Override
public void startStream() {
LOGGER.debug("{} startStream", STREAMS_ID);
- throw new org.apache.commons.lang.NotImplementedException();
- }
- protected void captureTimeline(long currentId) {
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
- Paging paging = new Paging(1, 200);
- List<Status> statuses = null;
- boolean KeepGoing = true;
- boolean hadFailure = false;
+ LOGGER.info("readCurrent");
- do
- {
- int keepTrying = 0;
+ while(idsBatches.hasNext())
+ loadBatch(idsBatches.next());
- // keep trying to load, give it 5 attempts.
- //This value was chosen because it seemed like a reasonable number of times
- //to retry capturing a timeline given the sorts of errors that could potentially
- //occur (network timeout/interruption, faulty client, etc.)
- while (keepTrying < 5)
- {
+ while(screenNameBatches.hasNext())
+ loadBatch(screenNameBatches.next());
- try
- {
- statuses = client.getUserTimeline(currentId, paging);
- for (Status tStat : statuses) {
- String json = TwitterObjectFactory.getRawJSON(tStat);
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- paging.setPage(paging.getPage() + 1);
-
- keepTrying = 10;
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
- }
- }
- }
- while (shouldContinuePulling(statuses));
+ executor.shutdown();
}
- private Map<Long, Long> userPullInfo;
-
protected boolean shouldContinuePulling(List<Status> statuses) {
return (statuses != null) && (statuses.size() > 0);
}
- private void sleep()
- {
- Thread.yield();
- try {
- // wait one tenth of a millisecond
- Thread.yield();
- Thread.sleep(1);
- Thread.yield();
- }
- catch(IllegalArgumentException e) {
- // passing in static values, this will never happen
- }
- catch(InterruptedException e) {
- // noOp, there must have been an issue sleeping
+ private void loadBatch(Long[] ids) {
+ Twitter client = getTwitterClient();
+ int keepTrying = 0;
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+ try
+ {
+ long[] toQuery = new long[ids.length];
+ for(int i = 0; i < ids.length; i++)
+ toQuery[i] = ids[i];
+
+ for (User tStat : client.lookupUsers(toQuery)) {
+
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+ executor.submit(providerTask);
+
+ }
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
}
- Thread.yield();
}
- public StreamsResultSet readCurrent() {
- LOGGER.debug("{} readCurrent", STREAMS_ID);
-
- Preconditions.checkArgument(ids.hasNext());
- StreamsResultSet result;
+ private void loadBatch(String[] ids) {
+ Twitter client = getTwitterClient();
+ int keepTrying = 0;
- StreamsResultSet current;
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+ try
+ {
+ for (User tStat : client.lookupUsers(ids)) {
- synchronized( TwitterTimelineProvider.class ) {
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+ executor.submit(providerTask);
- while( ids.hasNext() ) {
- Long currentId = ids.next();
- LOGGER.info("Provider Task Starting: {}", currentId);
- captureTimeline(currentId);
+ }
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
-
}
+ }
- LOGGER.info("Finished. Cleaning up...");
+ public StreamsResultSet readCurrent() {
LOGGER.info("Providing {} docs", providerQueue.size());
+ StreamsResultSet result;
+
try {
lock.writeLock().lock();
result = new StreamsResultSet(providerQueue);
@@ -228,8 +194,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
} finally {
lock.writeLock().unlock();
}
- running.set(false);
- LOGGER.info("Exiting");
+
+ if( providerQueue.isEmpty() && executor.isTerminated()) {
+ LOGGER.info("Finished. Cleaning up...");
+
+ running.set(false);
+
+ LOGGER.info("Exiting");
+ }
return result;
@@ -291,15 +263,55 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
Preconditions.checkNotNull(config.getOauth().getAccessToken());
Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getFollow());
+ Preconditions.checkNotNull(config.getInfo());
+
+ List<String> screenNames = new ArrayList<String>();
+ List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+ List<Long> ids = new ArrayList<Long>();
+ List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+ for(String s : config.getInfo()) {
+ if(s != null)
+ {
+ String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ ids.add(Long.parseLong(potentialScreenName));
+ } catch (Exception e) {
+ screenNames.add(potentialScreenName);
+ }
+
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+ if(ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<Long>();
+ }
+
+ if(screenNames.size() >= 100) {
+ // add the batch
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+ // reset the Ids
+ screenNames = new ArrayList<String>();
+ }
+ }
+ }
+
+
+ if(ids.size() > 0)
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
- idsCount = config.getFollow().size();
- ids = config.getFollow().iterator();
+ if(screenNames.size() > 0)
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
+ this.idsBatches = idsBatches.iterator();
+ this.screenNameBatches = screenNameBatches.iterator();
- client = getTwitterClient();
}
protected Twitter getTwitterClient()
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f295729..09969d9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,13 +18,11 @@
package org.apache.streams.twitter.provider;
-import org.joda.time.DateTime;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.json.DataObjectFactory;
+import twitter4j.*;
import java.util.List;
@@ -36,12 +34,12 @@ public class TwitterTimelineProviderTask implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
private TwitterTimelineProvider provider;
- private Twitter twitter;
+ private Twitter client;
private Long id;
public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
this.provider = provider;
- this.twitter = twitter;
+ this.client = twitter;
this.id = id;
}
@@ -58,47 +56,43 @@ public class TwitterTimelineProviderTask implements Runnable {
int keepTrying = 0;
// keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
+ //This value was chosen because it seemed like a reasonable number of times
+ //to retry capturing a timeline given the sorts of errors that could potentially
+ //occur (network timeout/interruption, faulty client, etc.)
+ while (keepTrying < 5)
{
try
{
- statuses = twitter.getUserTimeline(id, paging);
+ statuses = client.getUserTimeline(id, paging);
for (Status tStat : statuses)
{
- if( provider.start != null &&
- provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
- {
- // they hit the last date we wanted to collect
- // we can now exit early
- KeepGoing = false;
- }
- // emit the record
- String json = DataObjectFactory.getRawJSON(tStat);
-
- //provider.offer(json);
+ String json = TwitterObjectFactory.getRawJSON(tStat);
+ try {
+ provider.lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
+ } finally {
+ provider.lock.readLock().unlock();
+ }
}
-
paging.setPage(paging.getPage() + 1);
keepTrying = 10;
}
- catch(Exception e)
- {
- hadFailure = true;
- keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e);
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
}
}
- while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
-
- LOGGER.info("Provider Finished. Cleaning up...");
+ while (provider.shouldContinuePulling(statuses));
- LOGGER.info("Provider Exiting");
+ LOGGER.info(id + " Thread Finished");
}