You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 18:49:56 UTC
[5/7] git commit: incorporated PR feedback
incorporated PR feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19b380f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19b380f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19b380f9
Branch: refs/heads/master
Commit: 19b380f9930867e6194f5f09ee97708ac4f2d61e
Parents: 1ac7fc7
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Jul 25 13:59:13 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Jul 25 13:59:13 2014 -0500
----------------------------------------------------------------------
.../provider/TwitterTimelineProvider.java | 177 +++++++++----------
.../provider/TwitterTimelineProviderTask.java | 8 +-
2 files changed, 82 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19b380f9/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 23b38bf..a7b39c1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,12 +19,15 @@
package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,10 +37,7 @@ import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
@@ -66,8 +66,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
this.config = config;
}
- protected Iterator<Long[]> idsBatches;
- protected Iterator<String[]> screenNameBatches;
+ protected Collection<String[]> screenNameBatches;
+ protected Collection<Long> ids;
protected volatile Queue<StreamsDatum> providerQueue;
@@ -105,15 +105,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public void startStream() {
LOGGER.debug("{} startStream", STREAMS_ID);
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+ Preconditions.checkArgument(!ids.isEmpty());
LOGGER.info("readCurrent");
- while(idsBatches.hasNext())
- loadBatch(idsBatches.next());
+ submitTimelineThreads(ids.toArray(new Long[0]));
- while(screenNameBatches.hasNext())
- loadBatch(screenNameBatches.next());
+ running.set(true);
executor.shutdown();
}
@@ -122,62 +120,30 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
return (statuses != null) && (statuses.size() > 0);
}
- private void loadBatch(Long[] ids) {
+ private void submitTimelineThreads(Long[] ids) {
Twitter client = getTwitterClient();
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
- {
- try
- {
- long[] toQuery = new long[ids.length];
- for(int i = 0; i < ids.length; i++)
- toQuery[i] = ids[i];
- for (User tStat : client.lookupUsers(toQuery)) {
+ for(int i = 0; i < ids.length; i++) {
- TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
- executor.submit(providerTask);
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+ executor.submit(providerTask);
- }
- keepTrying = 10;
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
- }
}
+
}
- private void loadBatch(String[] ids) {
+ private Collection<Long> retrieveIds(String[] screenNames) {
Twitter client = getTwitterClient();
- int keepTrying = 0;
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
- {
- try
- {
- for (User tStat : client.lookupUsers(ids)) {
-
- TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
- executor.submit(providerTask);
-
- }
- keepTrying = 10;
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ List<Long> ids = Lists.newArrayList();
+ try {
+ for (User tStat : client.lookupUsers(screenNames)) {
+ ids.add(tStat.getId());
}
+ } catch (TwitterException e) {
+ LOGGER.error("Failure retrieving user details.", e.getMessage());
}
+ return ids;
}
public StreamsResultSet readCurrent() {
@@ -248,7 +214,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public void prepare(Object o) {
executor = getExecutor();
- running.set(true);
+
try {
lock.writeLock().lock();
providerQueue = constructQueue();
@@ -264,52 +230,24 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
Preconditions.checkNotNull(config.getInfo());
- List<String> screenNames = new ArrayList<String>();
- List<String[]> screenNameBatches = new ArrayList<String[]>();
-
- List<Long> ids = new ArrayList<Long>();
- List<Long[]> idsBatches = new ArrayList<Long[]>();
-
- for(String s : config.getInfo()) {
- if(s != null)
- {
- String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
-
- // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
- // screen name list
- try {
- ids.add(Long.parseLong(potentialScreenName));
- } catch (NumberFormatException e) {
- screenNames.add(potentialScreenName);
- }
-
- // Twitter allows for batches up to 100 per request, but you cannot mix types
-
- if(ids.size() >= 100) {
- // add the batch
- idsBatches.add(ids.toArray(new Long[ids.size()]));
- // reset the Ids
- ids = new ArrayList<Long>();
- }
+ ImmutableList<String> screenNames = ImmutableList.copyOf(screenNamesOnly(config.getInfo()));
+ List<Long> ids = numericIdsOnly(config.getInfo());
- if(screenNames.size() >= 100) {
- // add the batch
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- // reset the Ids
- screenNames = new ArrayList<String>();
- }
- }
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+ while(screenNames.size() >= 100) {
+ screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
+ screenNames = screenNames.subList(100, screenNames.size());
}
-
- if(ids.size() > 0)
- idsBatches.add(ids.toArray(new Long[ids.size()]));
-
if(screenNames.size() > 0)
screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- this.idsBatches = idsBatches.iterator();
- this.screenNameBatches = screenNameBatches.iterator();
+ Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
+
+ while(screenNameBatchIterator.hasNext()) {
+ Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
+ ids.addAll(batchIds);
+ }
}
@@ -336,4 +274,49 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public void cleanUp() {
shutdownAndAwaitTermination(executor);
}
+
+ protected List<Long> numericIdsOnly(List<String> allIds) {
+ List<Long> result = Lists.newArrayList();
+ for(String id : allIds) {
+ if(id != null)
+ {
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ result.add(Long.parseLong(id));
+ } catch (NumberFormatException e) {}
+
+ }
+ }
+ return result;
+ }
+
+ protected List<String> screenNamesOnly(List<String> allIds) {
+ List<String> result = Lists.newArrayList();
+ for(String id : allIds) {
+ if(id != null)
+ {
+ String potentialScreenName = id.replaceAll("@", "").trim().toLowerCase();
+
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ Long.parseLong(id);
+ } catch (NumberFormatException e) {
+ result.add(potentialScreenName);
+ }
+
+ }
+ }
+ return result;
+ }
+
+ protected void addDatum(StreamsDatum datum) {
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19b380f9/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 22e129e..0ad8ac3 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -70,12 +70,8 @@ public class TwitterTimelineProviderTask implements Runnable {
{
String json = TwitterObjectFactory.getRawJSON(tStat);
- try {
- provider.lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
- } finally {
- provider.lock.readLock().unlock();
- }
+ provider.addDatum(new StreamsDatum(json));
+
}
paging.setPage(paging.getPage() + 1);