You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 18:49:56 UTC

[5/7] git commit: incorporated PR feedback

incorporated PR feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19b380f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19b380f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19b380f9

Branch: refs/heads/master
Commit: 19b380f9930867e6194f5f09ee97708ac4f2d61e
Parents: 1ac7fc7
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Jul 25 13:59:13 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Jul 25 13:59:13 2014 -0500

----------------------------------------------------------------------
 .../provider/TwitterTimelineProvider.java       | 177 +++++++++----------
 .../provider/TwitterTimelineProviderTask.java   |   8 +-
 2 files changed, 82 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19b380f9/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 23b38bf..a7b39c1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,12 +19,15 @@
 package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,10 +37,7 @@ import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -66,8 +66,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         this.config = config;
     }
 
-    protected Iterator<Long[]> idsBatches;
-    protected Iterator<String[]> screenNameBatches;
+    protected Collection<String[]> screenNameBatches;
+    protected Collection<Long> ids;
 
     protected volatile Queue<StreamsDatum> providerQueue;
 
@@ -105,15 +105,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public void startStream() {
         LOGGER.debug("{} startStream", STREAMS_ID);
 
-        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+        Preconditions.checkArgument(!ids.isEmpty());
 
         LOGGER.info("readCurrent");
 
-        while(idsBatches.hasNext())
-            loadBatch(idsBatches.next());
+        submitTimelineThreads(ids.toArray(new Long[0]));
 
-        while(screenNameBatches.hasNext())
-            loadBatch(screenNameBatches.next());
+        running.set(true);
 
         executor.shutdown();
     }
@@ -122,62 +120,30 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         return (statuses != null) && (statuses.size() > 0);
     }
 
-    private void loadBatch(Long[] ids) {
+    private void submitTimelineThreads(Long[] ids) {
         Twitter client = getTwitterClient();
-        int keepTrying = 0;
-
-        // keep trying to load, give it 5 attempts.
-        //while (keepTrying < 10)
-        while (keepTrying < 1)
-        {
-            try
-            {
-                long[] toQuery = new long[ids.length];
-                for(int i = 0; i < ids.length; i++)
-                    toQuery[i] = ids[i];
 
-                for (User tStat : client.lookupUsers(toQuery)) {
+        for(int i = 0; i < ids.length; i++) {
 
-                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-                    executor.submit(providerTask);
+            TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+            executor.submit(providerTask);
 
-                }
-                keepTrying = 10;
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-            }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-            }
         }
+
     }
 
-    private void loadBatch(String[] ids) {
+    private Collection<Long> retrieveIds(String[] screenNames) {
         Twitter client = getTwitterClient();
-        int keepTrying = 0;
 
-        // keep trying to load, give it 5 attempts.
-        //while (keepTrying < 10)
-        while (keepTrying < 1)
-        {
-            try
-            {
-                for (User tStat : client.lookupUsers(ids)) {
-
-                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-                    executor.submit(providerTask);
-
-                }
-                keepTrying = 10;
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-            }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+        List<Long> ids = Lists.newArrayList();
+        try {
+            for (User tStat : client.lookupUsers(screenNames)) {
+                ids.add(tStat.getId());
             }
+        } catch (TwitterException e) {
+            LOGGER.error("Failure retrieving user details.", e.getMessage());
         }
+        return ids;
     }
 
     public StreamsResultSet readCurrent() {
@@ -248,7 +214,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public void prepare(Object o) {
 
         executor = getExecutor();
-        running.set(true);
+
         try {
             lock.writeLock().lock();
             providerQueue = constructQueue();
@@ -264,52 +230,24 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
         Preconditions.checkNotNull(config.getInfo());
 
-        List<String> screenNames = new ArrayList<String>();
-        List<String[]> screenNameBatches = new ArrayList<String[]>();
-
-        List<Long> ids = new ArrayList<Long>();
-        List<Long[]> idsBatches = new ArrayList<Long[]>();
-
-        for(String s : config.getInfo()) {
-            if(s != null)
-            {
-                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
-
-                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
-                // screen name list
-                try {
-                    ids.add(Long.parseLong(potentialScreenName));
-                } catch (NumberFormatException e) {
-                    screenNames.add(potentialScreenName);
-                }
-
-                // Twitter allows for batches up to 100 per request, but you cannot mix types
-
-                if(ids.size() >= 100) {
-                    // add the batch
-                    idsBatches.add(ids.toArray(new Long[ids.size()]));
-                    // reset the Ids
-                    ids = new ArrayList<Long>();
-                }
+        ImmutableList<String> screenNames = ImmutableList.copyOf(screenNamesOnly(config.getInfo()));
+        List<Long> ids = numericIdsOnly(config.getInfo());
 
-                if(screenNames.size() >= 100) {
-                    // add the batch
-                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-                    // reset the Ids
-                    screenNames = new ArrayList<String>();
-                }
-            }
+        // Twitter allows for batches up to 100 per request, but you cannot mix types
+        while(screenNames.size() >= 100) {
+            screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
+            screenNames = screenNames.subList(100, screenNames.size());
         }
 
-
-        if(ids.size() > 0)
-            idsBatches.add(ids.toArray(new Long[ids.size()]));
-
         if(screenNames.size() > 0)
             screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
 
-        this.idsBatches = idsBatches.iterator();
-        this.screenNameBatches = screenNameBatches.iterator();
+        Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
+
+        while(screenNameBatchIterator.hasNext()) {
+            Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
+            ids.addAll(batchIds);
+        }
 
     }
 
@@ -336,4 +274,49 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public void cleanUp() {
         shutdownAndAwaitTermination(executor);
     }
+
+    protected List<Long> numericIdsOnly(List<String> allIds) {
+        List<Long> result = Lists.newArrayList();
+        for(String id : allIds) {
+            if(id != null)
+            {
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    result.add(Long.parseLong(id));
+                } catch (NumberFormatException e) {}
+
+            }
+        }
+        return result;
+    }
+
+    protected List<String> screenNamesOnly(List<String> allIds) {
+        List<String> result = Lists.newArrayList();
+        for(String id : allIds) {
+            if(id != null)
+            {
+                String potentialScreenName = id.replaceAll("@", "").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    Long.parseLong(id);
+                } catch (NumberFormatException e) {
+                    result.add(potentialScreenName);
+                }
+
+            }
+        }
+        return result;
+    }
+
+    protected void addDatum(StreamsDatum datum) {
+        try {
+            lock.readLock().lock();
+            ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19b380f9/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 22e129e..0ad8ac3 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -70,12 +70,8 @@ public class TwitterTimelineProviderTask implements Runnable {
                     {
                         String json = TwitterObjectFactory.getRawJSON(tStat);
 
-                        try {
-                            provider.lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
-                        } finally {
-                            provider.lock.readLock().unlock();
-                        }
+                        provider.addDatum(new StreamsDatum(json));
+
                     }
 
                     paging.setPage(paging.getPage() + 1);