You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/09 01:38:18 UTC

[2/3] git commit: allow timeline provider to run as perpetual stream

allow timeline provider to run as perpetual stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/476748d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/476748d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/476748d4

Branch: refs/heads/STREAMS-115
Commit: 476748d4518ecfc960550e43bd3d25e312c731a4
Parents: c50ce91
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Jun 23 23:53:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 8 12:16:00 2014 -0500

----------------------------------------------------------------------
 .../provider/TwitterTimelineProvider.java       | 242 ++++++++++---------
 .../provider/TwitterTimelineProviderTask.java   |  54 ++---
 2 files changed, 151 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f9d524..d3c4a51 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,20 +18,14 @@
 
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,18 +35,14 @@ import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
@@ -61,28 +51,28 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
     public static final int MAX_NUMBER_WAITING = 10000;
 
-    private TwitterStreamConfiguration config;
+    private TwitterUserInformationConfiguration config;
 
     private Class klass;
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public TwitterStreamConfiguration getConfig() {
+    public TwitterUserInformationConfiguration getConfig() {
         return config;
     }
 
-    public void setConfig(TwitterStreamConfiguration config) {
+    public void setConfig(TwitterUserInformationConfiguration config) {
         this.config = config;
     }
 
+    protected Iterator<Long[]> idsBatches;
+    protected Iterator<String[]> screenNameBatches;
+
     protected volatile Queue<StreamsDatum> providerQueue;
 
     protected int idsCount;
     protected Twitter client;
-    protected Iterator<Long> ids;
 
-    ListenableFuture providerTaskComplete;
-
-    protected ListeningExecutorService executor;
+    protected ExecutorService executor;
 
     protected DateTime start;
     protected DateTime end;
@@ -98,22 +88,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
-    public TwitterTimelineProvider() {
-        Config config = StreamsConfigurator.config.getConfig("twitter");
-        this.config = TwitterStreamConfigurator.detectConfiguration(config);
-    }
-
-    public TwitterTimelineProvider(TwitterStreamConfiguration config) {
+    public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
         this.config = config;
     }
 
-    public TwitterTimelineProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("twitter");
-        this.config = TwitterStreamConfigurator.detectConfiguration(config);
-        this.klass = klass;
-    }
-
-    public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
+    public TwitterTimelineProvider(TwitterUserInformationConfiguration config, Class klass) {
         this.config = config;
         this.klass = klass;
     }
@@ -125,101 +104,88 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     @Override
     public void startStream() {
         LOGGER.debug("{} startStream", STREAMS_ID);
-        throw new org.apache.commons.lang.NotImplementedException();
-    }
 
-    protected void captureTimeline(long currentId) {
+        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
-        Paging paging = new Paging(1, 200);
-        List<Status> statuses = null;
-        boolean KeepGoing = true;
-        boolean hadFailure = false;
+        LOGGER.info("readCurrent");
 
-        do
-        {
-            int keepTrying = 0;
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
 
-            // keep trying to load, give it 5 attempts.
-            //This value was chosen because it seemed like a reasonable number of times
-            //to retry capturing a timeline given the sorts of errors that could potentially
-            //occur (network timeout/interruption, faulty client, etc.)
-            while (keepTrying < 5)
-            {
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
 
-                try
-                {
-                    statuses = client.getUserTimeline(currentId, paging);
-                    for (Status tStat : statuses) {
-                        String json = TwitterObjectFactory.getRawJSON(tStat);
-                        try {
-                            lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
-                        } finally {
-                            lock.readLock().unlock();
-                        }
-                    }
-
-                    paging.setPage(paging.getPage() + 1);
-
-                    keepTrying = 10;
-                }
-                catch(TwitterException twitterException) {
-                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-                }
-                catch(Exception e) {
-                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-                }
-            }
-        }
-        while (shouldContinuePulling(statuses));
+        executor.shutdown();
     }
 
-    private Map<Long, Long> userPullInfo;
-
     protected boolean shouldContinuePulling(List<Status> statuses) {
         return (statuses != null) && (statuses.size() > 0);
     }
 
-    private void sleep()
-    {
-        Thread.yield();
-        try {
-            // wait one tenth of a millisecond
-            Thread.yield();
-            Thread.sleep(1);
-            Thread.yield();
-        }
-        catch(IllegalArgumentException e) {
-            // passing in static values, this will never happen
-        }
-        catch(InterruptedException e) {
-            // noOp, there must have been an issue sleeping
+    private void loadBatch(Long[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                long[] toQuery = new long[ids.length];
+                for(int i = 0; i < ids.length; i++)
+                    toQuery[i] = ids[i];
+
+                for (User tStat : client.lookupUsers(toQuery)) {
+
+                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+                    executor.submit(providerTask);
+
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
         }
-        Thread.yield();
     }
 
-    public StreamsResultSet readCurrent() {
-        LOGGER.debug("{} readCurrent", STREAMS_ID);
-
-        Preconditions.checkArgument(ids.hasNext());
-        StreamsResultSet result;
+    private void loadBatch(String[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
 
-        StreamsResultSet current;
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                for (User tStat : client.lookupUsers(ids)) {
 
-        synchronized( TwitterTimelineProvider.class ) {
+                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+                    executor.submit(providerTask);
 
-            while( ids.hasNext() ) {
-                Long currentId = ids.next();
-                LOGGER.info("Provider Task Starting: {}", currentId);
-                captureTimeline(currentId);
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
             }
-
         }
+    }
 
-        LOGGER.info("Finished.  Cleaning up...");
+    public StreamsResultSet readCurrent() {
 
         LOGGER.info("Providing {} docs", providerQueue.size());
 
+        StreamsResultSet result;
+
         try {
             lock.writeLock().lock();
             result = new StreamsResultSet(providerQueue);
@@ -228,8 +194,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         } finally {
             lock.writeLock().unlock();
         }
-        running.set(false);
-        LOGGER.info("Exiting");
+
+        if( providerQueue.isEmpty() && executor.isTerminated()) {
+            LOGGER.info("Finished.  Cleaning up...");
+
+            running.set(false);
+
+            LOGGER.info("Exiting");
+        }
 
         return result;
 
@@ -291,15 +263,55 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
         Preconditions.checkNotNull(config.getOauth().getAccessToken());
         Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getFollow());
+        Preconditions.checkNotNull(config.getInfo());
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : config.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
 
-        idsCount = config.getFollow().size();
-        ids = config.getFollow().iterator();
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
 
-        jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
 
-        client = getTwitterClient();
     }
 
     protected Twitter getTwitterClient()

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/476748d4/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f295729..09969d9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,13 +18,11 @@
 
 package org.apache.streams.twitter.provider;
 
-import org.joda.time.DateTime;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.json.DataObjectFactory;
+import twitter4j.*;
 
 import java.util.List;
 
@@ -36,12 +34,12 @@ public class TwitterTimelineProviderTask implements Runnable {
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
     private TwitterTimelineProvider provider;
-    private Twitter twitter;
+    private Twitter client;
     private Long id;
 
     public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
         this.provider = provider;
-        this.twitter = twitter;
+        this.client = twitter;
         this.id = id;
     }
 
@@ -58,47 +56,43 @@ public class TwitterTimelineProviderTask implements Runnable {
             int keepTrying = 0;
 
             // keep trying to load, give it 5 attempts.
-            //while (keepTrying < 10)
-            while (keepTrying < 1)
+            //This value was chosen because it seemed like a reasonable number of times
+            //to retry capturing a timeline given the sorts of errors that could potentially
+            //occur (network timeout/interruption, faulty client, etc.)
+            while (keepTrying < 5)
             {
 
                 try
                 {
-                    statuses = twitter.getUserTimeline(id, paging);
+                    statuses = client.getUserTimeline(id, paging);
 
                     for (Status tStat : statuses)
                     {
-                        if( provider.start != null &&
-                            provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
-                        {
-                            // they hit the last date we wanted to collect
-                            // we can now exit early
-                            KeepGoing = false;
-                        }
-                        // emit the record
-                        String json = DataObjectFactory.getRawJSON(tStat);
-
-                        //provider.offer(json);
+                        String json = TwitterObjectFactory.getRawJSON(tStat);
 
+                        try {
+                            provider.lock.readLock().lock();
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
+                        } finally {
+                            provider.lock.readLock().unlock();
+                        }
                     }
 
-
                     paging.setPage(paging.getPage() + 1);
 
                     keepTrying = 10;
                 }
-                catch(Exception e)
-                {
-                    hadFailure = true;
-                    keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e);
+                catch(TwitterException twitterException) {
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+                }
+                catch(Exception e) {
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
                 }
             }
         }
-        while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
-
-        LOGGER.info("Provider Finished.  Cleaning up...");
+        while (provider.shouldContinuePulling(statuses));
 
-        LOGGER.info("Provider Exiting");
+        LOGGER.info(id + " Thread Finished");
 
     }