You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/12/17 19:58:01 UTC

incubator-streams git commit: streams-provider-twitter bug fixes & related improvements.

Repository: incubator-streams
Updated Branches:
  refs/heads/master 92c5504dc -> 5e1bd10be


streams-provider-twitter bug fixes & related improvements.

STREAMS-465
STREAMS-466
STREAMS-467


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5e1bd10b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5e1bd10b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5e1bd10b

Branch: refs/heads/master
Commit: 5e1bd10be6b2b45b91c0cdc26e22508d6445e021
Parents: 92c5504
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Dec 13 15:29:28 2016 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sat Dec 17 13:30:12 2016 -0600

----------------------------------------------------------------------
 .../provider/TwitterFollowingProvider.java      |  5 ++
 .../provider/TwitterFollowingProviderTask.java  | 65 +++++++++++++-------
 .../provider/TwitterTimelineProvider.java       | 22 +++----
 .../provider/TwitterTimelineProviderTask.java   | 29 ++++++---
 .../twitter/TwitterFollowingConfiguration.json  | 15 +++++
 .../TwitterTimelineProviderConfiguration.json   | 23 +++++++
 .../TwitterUserInformationConfiguration.json    |  6 +-
 7 files changed, 119 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 40a0957..16b6c03 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -38,6 +38,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import twitter4j.Status;
 import twitter4j.Twitter;
 
 import java.io.BufferedOutputStream;
@@ -215,6 +216,10 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
   }
 
+  public boolean shouldContinuePulling(List<twitter4j.User> users) {
+    return (users != null) && (users.size() == config.getPageSize());
+  }
+
   @Override
   public boolean isRunning() {
     if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index 1607da4..313416a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -33,6 +33,7 @@ import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterObjectFactory;
 
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -75,6 +76,8 @@ public class TwitterFollowingProviderTask implements Runnable {
     this.screenName = screenName;
   }
 
+  int page_count = 0;
+  int item_count = 0;
 
   @Override
   public void run() {
@@ -119,26 +122,27 @@ public class TwitterFollowingProviderTask implements Runnable {
 
   private void collectUsers(Long id) {
     int keepTrying = 0;
-
+    List<twitter4j.User> list = null;
     long curser = -1;
 
+    twitter4j.User user;
+    String userJson;
+    try {
+      user = client.users().showUser(id);
+      userJson = TwitterObjectFactory.getRawJSON(user);
+    } catch (TwitterException ex) {
+      LOGGER.error("Failure looking up " + id);
+      return;
+    }
+
     do {
       try {
-        twitter4j.User user;
-        String userJson;
-        try {
-          user = client.users().showUser(id);
-          userJson = TwitterObjectFactory.getRawJSON(user);
-        } catch (TwitterException ex) {
-          LOGGER.error("Failure looking up " + id);
-          break;
-        }
 
-        PagableResponseList<twitter4j.User> list = null;
+        PagableResponseList<twitter4j.User> page = null;
         if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          list = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getMaxItems().intValue());
+          page = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getPageSize().intValue());
         } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          list = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getMaxItems().intValue());
+          page = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getPageSize().intValue());
         }
 
         Objects.requireNonNull(list);
@@ -162,27 +166,28 @@ public class TwitterFollowingProviderTask implements Runnable {
 
             Objects.requireNonNull(follow);
 
-            if ( count < provider.getConfig().getMaxItems()) {
+            if ( item_count < provider.getConfig().getMaxItems()) {
               ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              count++;
+              item_count++;
             }
 
           } catch (Exception ex) {
             LOGGER.warn("Exception: {}", ex);
           }
         }
-        if ( !list.hasNext() ) {
+        if ( !page.hasNext() ) {
           break;
         }
-        if ( list.getNextCursor() == 0 ) {
+        if ( page.getNextCursor() == 0 ) {
           break;
         }
-        curser = list.getNextCursor();
+        curser = page.getNextCursor();
+        page_count++;
       } catch (Exception ex) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
       }
     }
-    while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+    while (provider.shouldContinuePulling(list) && curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
   }
 
   private void collectIds(Long id) {
@@ -190,6 +195,16 @@ public class TwitterFollowingProviderTask implements Runnable {
 
     long curser = -1;
 
+    twitter4j.User user;
+    String userJson;
+    try {
+      user = client.users().showUser(id);
+      userJson = TwitterObjectFactory.getRawJSON(user);
+    } catch (TwitterException ex) {
+      LOGGER.error("Failure looking up " + id);
+      return;
+    }
+
     do {
       try {
         twitter4j.IDs ids = null;
@@ -218,9 +233,9 @@ public class TwitterFollowingProviderTask implements Runnable {
 
             Objects.requireNonNull(follow);
 
-            if ( count < provider.getConfig().getMaxItems()) {
+            if ( item_count < provider.getConfig().getMaxItems()) {
               ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              count++;
+              item_count++;
             }
           } catch (Exception ex) {
             LOGGER.warn("Exception: {}", ex);
@@ -233,13 +248,19 @@ public class TwitterFollowingProviderTask implements Runnable {
           break;
         }
         curser = ids.getNextCursor();
+        page_count++;
       } catch (TwitterException twitterException) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
       } catch (Exception ex) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
       }
     }
-    while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+    while (shouldContinuePulling() && curser != 0 && keepTrying < provider.getConfig().getRetryMax() );
+  }
+
+  public boolean shouldContinuePulling() {
+    return ( item_count < provider.getConfig().getMaxItems()
+              && page_count < provider.getConfig().getMaxPages());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 88642f8..9778de0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -26,7 +26,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -84,18 +84,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   public static final int MAX_NUMBER_WAITING = 10000;
 
-  private TwitterUserInformationConfiguration config;
+  private TwitterTimelineProviderConfiguration config;
 
   protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  public TwitterUserInformationConfiguration getConfig() {
+  public TwitterTimelineProviderConfiguration getConfig() {
     return config;
   }
 
-  public void setConfig(TwitterUserInformationConfiguration config) {
-    this.config = config;
-  }
-
   protected Collection<String[]> screenNameBatches;
   protected Collection<Long> ids;
 
@@ -153,7 +149,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     Config typesafe  = testResourceConfig.withFallback(reference).resolve();
 
     StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-    TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+    TwitterTimelineProviderConfiguration config = new ComponentConfigurator<>(TwitterTimelineProviderConfiguration.class).detectConfiguration(typesafe, "twitter");
     TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
 
     ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
@@ -178,7 +174,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     outStream.flush();
   }
 
-  public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
+  public TwitterTimelineProvider(TwitterTimelineProviderConfiguration config) {
     this.config = config;
   }
 
@@ -194,6 +190,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
   @Override
   public void prepare(Object configurationObject) {
 
+    if( configurationObject instanceof TwitterTimelineProviderConfiguration ) {
+      this.config = (TwitterTimelineProviderConfiguration)configurationObject;
+    }
+
     try {
       lock.writeLock().lock();
       providerQueue = constructQueue();
@@ -232,10 +232,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   }
 
-  public boolean shouldContinuePulling(List<Status> statuses) {
-    return (statuses != null) && (statuses.size() > 0);
-  }
-
   protected void submitTimelineThreads(Long[] ids) {
 
     Twitter client = getTwitterClient();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 67ccf0c..8cb2b46 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Paging;
+import twitter4j.ResponseList;
 import twitter4j.Status;
 import twitter4j.Twitter;
 import twitter4j.TwitterObjectFactory;
@@ -60,12 +61,14 @@ public class TwitterTimelineProviderTask implements Runnable {
     this.id = id;
   }
 
+  int page_count = 1;
+  int item_count = 0;
+  List<Status> lastPage = null;
+
   @Override
   public void run() {
 
-    Paging paging = new Paging(1, 200);
-    List<Status> statuses = null;
-    int count = 0;
+    Paging paging = new Paging(page_count, provider.getConfig().getPageSize().intValue());
 
     LOGGER.info(id + " Thread Starting");
 
@@ -81,25 +84,27 @@ public class TwitterTimelineProviderTask implements Runnable {
         try {
           this.client = provider.getTwitterClient();
 
-          statuses = client.getUserTimeline(id, paging);
+          ResponseList<Status> statuses = client.getUserTimeline(id, paging);
 
           for (Status twitterStatus : statuses) {
 
             String json = TwitterObjectFactory.getRawJSON(twitterStatus);
 
-            if ( count < provider.getConfig().getMaxItems() ) {
+            if ( item_count < provider.getConfig().getMaxItems() ) {
               try {
                 org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
                 ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
               } catch (Exception exception) {
                 LOGGER.warn("Failed to read document as Tweet ", twitterStatus);
               }
-              count++;
+              item_count++;
             }
 
           }
 
-          paging.setPage(paging.getPage() + 1);
+          lastPage = statuses;
+          page_count = paging.getPage() + 1;
+          paging.setPage(page_count);
 
           keepTrying = 10;
         } catch (Exception ex) {
@@ -107,10 +112,18 @@ public class TwitterTimelineProviderTask implements Runnable {
         }
       }
     }
-    while (provider.shouldContinuePulling(statuses) && count < provider.getConfig().getMaxItems());
+    while (shouldContinuePulling());
 
     LOGGER.info(id + " Thread Finished");
 
   }
 
+  public boolean shouldContinuePulling() {
+    return (lastPage != null)
+        && item_count < provider.getConfig().getMaxItems()
+        && page_count <= provider.getConfig().getMaxPages();
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
index dda5d1b..89fc7af 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
@@ -13,6 +13,21 @@
             "type": "boolean",
             "description": "Whether to collect ids only, or full profiles",
             "default": "true"
+        },
+        "max_items": {
+            "type": "integer",
+            "description": "Max items per user to collect",
+            "default": 50000
+        },
+        "max_pages": {
+            "type": "integer",
+            "description": "Max pages per user to request",
+            "default": 10
+        },
+        "page_size": {
+            "type": "integer",
+            "description": "Max items per page to request",
+            "default": 5000
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
new file mode 100644
index 0000000..37ed60e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
@@ -0,0 +1,23 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
+    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "max_items": {
+            "type": "integer",
+            "description": "Max items per user to collect",
+            "default": 3200
+        },
+        "max_pages": {
+            "type": "integer",
+            "description": "Max items per page to request",
+            "default": 16
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
index 01b1c26..405c87a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
@@ -16,10 +16,10 @@
                 "type": "string"
             }
         },
-        "max_items": {
+        "page_size": {
             "type": "integer",
-            "description": "Max items per user to collect",
-            "default": 5000
+            "description": "Max items per page to request",
+            "default": 200
         }
     }
 }
\ No newline at end of file