You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/12/17 19:58:01 UTC
incubator-streams git commit: streams-provider-twitter bug fixes &
related improvements.
Repository: incubator-streams
Updated Branches:
refs/heads/master 92c5504dc -> 5e1bd10be
streams-provider-twitter bug fixes & related improvements.
STREAMS-465
STREAMS-466
STREAMS-467
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5e1bd10b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5e1bd10b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5e1bd10b
Branch: refs/heads/master
Commit: 5e1bd10be6b2b45b91c0cdc26e22508d6445e021
Parents: 92c5504
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Dec 13 15:29:28 2016 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sat Dec 17 13:30:12 2016 -0600
----------------------------------------------------------------------
.../provider/TwitterFollowingProvider.java | 5 ++
.../provider/TwitterFollowingProviderTask.java | 65 +++++++++++++-------
.../provider/TwitterTimelineProvider.java | 22 +++----
.../provider/TwitterTimelineProviderTask.java | 29 ++++++---
.../twitter/TwitterFollowingConfiguration.json | 15 +++++
.../TwitterTimelineProviderConfiguration.json | 23 +++++++
.../TwitterUserInformationConfiguration.json | 6 +-
7 files changed, 119 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 40a0957..16b6c03 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -38,6 +38,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import twitter4j.Status;
import twitter4j.Twitter;
import java.io.BufferedOutputStream;
@@ -215,6 +216,10 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
+ public boolean shouldContinuePulling(List<twitter4j.User> users) {
+ return (users != null) && (users.size() == config.getPageSize());
+ }
+
@Override
public boolean isRunning() {
if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index 1607da4..313416a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -33,6 +33,7 @@ import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
+import java.util.List;
import java.util.Objects;
/**
@@ -75,6 +76,8 @@ public class TwitterFollowingProviderTask implements Runnable {
this.screenName = screenName;
}
+ int page_count = 0;
+ int item_count = 0;
@Override
public void run() {
@@ -119,26 +122,27 @@ public class TwitterFollowingProviderTask implements Runnable {
private void collectUsers(Long id) {
int keepTrying = 0;
-
+ List<twitter4j.User> list = null;
long curser = -1;
+ twitter4j.User user;
+ String userJson;
+ try {
+ user = client.users().showUser(id);
+ userJson = TwitterObjectFactory.getRawJSON(user);
+ } catch (TwitterException ex) {
+ LOGGER.error("Failure looking up " + id);
+ return;
+ }
+
do {
try {
- twitter4j.User user;
- String userJson;
- try {
- user = client.users().showUser(id);
- userJson = TwitterObjectFactory.getRawJSON(user);
- } catch (TwitterException ex) {
- LOGGER.error("Failure looking up " + id);
- break;
- }
- PagableResponseList<twitter4j.User> list = null;
+ PagableResponseList<twitter4j.User> page = null;
if ( provider.getConfig().getEndpoint().equals("followers") ) {
- list = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getMaxItems().intValue());
+ page = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getPageSize().intValue());
} else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- list = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getMaxItems().intValue());
+ page = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getPageSize().intValue());
}
Objects.requireNonNull(list);
@@ -162,27 +166,28 @@ public class TwitterFollowingProviderTask implements Runnable {
Objects.requireNonNull(follow);
- if ( count < provider.getConfig().getMaxItems()) {
+ if ( item_count < provider.getConfig().getMaxItems()) {
ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- count++;
+ item_count++;
}
} catch (Exception ex) {
LOGGER.warn("Exception: {}", ex);
}
}
- if ( !list.hasNext() ) {
+ if ( !page.hasNext() ) {
break;
}
- if ( list.getNextCursor() == 0 ) {
+ if ( page.getNextCursor() == 0 ) {
break;
}
- curser = list.getNextCursor();
+ curser = page.getNextCursor();
+ page_count++;
} catch (Exception ex) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
}
}
- while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+ while (provider.shouldContinuePulling(list) && curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
}
private void collectIds(Long id) {
@@ -190,6 +195,16 @@ public class TwitterFollowingProviderTask implements Runnable {
long curser = -1;
+ twitter4j.User user;
+ String userJson;
+ try {
+ user = client.users().showUser(id);
+ userJson = TwitterObjectFactory.getRawJSON(user);
+ } catch (TwitterException ex) {
+ LOGGER.error("Failure looking up " + id);
+ return;
+ }
+
do {
try {
twitter4j.IDs ids = null;
@@ -218,9 +233,9 @@ public class TwitterFollowingProviderTask implements Runnable {
Objects.requireNonNull(follow);
- if ( count < provider.getConfig().getMaxItems()) {
+ if ( item_count < provider.getConfig().getMaxItems()) {
ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- count++;
+ item_count++;
}
} catch (Exception ex) {
LOGGER.warn("Exception: {}", ex);
@@ -233,13 +248,19 @@ public class TwitterFollowingProviderTask implements Runnable {
break;
}
curser = ids.getNextCursor();
+ page_count++;
} catch (TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
} catch (Exception ex) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
}
}
- while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+ while (shouldContinuePulling() && curser != 0 && keepTrying < provider.getConfig().getRetryMax() );
+ }
+
+ public boolean shouldContinuePulling() {
+ return ( item_count < provider.getConfig().getMaxItems()
+ && page_count < provider.getConfig().getMaxPages());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 88642f8..9778de0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -26,7 +26,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.TwitterTimelineProviderConfiguration;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -84,18 +84,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public static final int MAX_NUMBER_WAITING = 10000;
- private TwitterUserInformationConfiguration config;
+ private TwitterTimelineProviderConfiguration config;
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public TwitterUserInformationConfiguration getConfig() {
+ public TwitterTimelineProviderConfiguration getConfig() {
return config;
}
- public void setConfig(TwitterUserInformationConfiguration config) {
- this.config = config;
- }
-
protected Collection<String[]> screenNameBatches;
protected Collection<Long> ids;
@@ -153,7 +149,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Config typesafe = testResourceConfig.withFallback(reference).resolve();
StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterTimelineProviderConfiguration config = new ComponentConfigurator<>(TwitterTimelineProviderConfiguration.class).detectConfiguration(typesafe, "twitter");
TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
@@ -178,7 +174,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
outStream.flush();
}
- public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
+ public TwitterTimelineProvider(TwitterTimelineProviderConfiguration config) {
this.config = config;
}
@@ -194,6 +190,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
@Override
public void prepare(Object configurationObject) {
+ if( configurationObject instanceof TwitterTimelineProviderConfiguration ) {
+ this.config = (TwitterTimelineProviderConfiguration)configurationObject;
+ }
+
try {
lock.writeLock().lock();
providerQueue = constructQueue();
@@ -232,10 +232,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
}
- public boolean shouldContinuePulling(List<Status> statuses) {
- return (statuses != null) && (statuses.size() > 0);
- }
-
protected void submitTimelineThreads(Long[] ids) {
Twitter client = getTwitterClient();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 67ccf0c..8cb2b46 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Paging;
+import twitter4j.ResponseList;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterObjectFactory;
@@ -60,12 +61,14 @@ public class TwitterTimelineProviderTask implements Runnable {
this.id = id;
}
+ int page_count = 1;
+ int item_count = 0;
+ List<Status> lastPage = null;
+
@Override
public void run() {
- Paging paging = new Paging(1, 200);
- List<Status> statuses = null;
- int count = 0;
+ Paging paging = new Paging(page_count, provider.getConfig().getPageSize().intValue());
LOGGER.info(id + " Thread Starting");
@@ -81,25 +84,27 @@ public class TwitterTimelineProviderTask implements Runnable {
try {
this.client = provider.getTwitterClient();
- statuses = client.getUserTimeline(id, paging);
+ ResponseList<Status> statuses = client.getUserTimeline(id, paging);
for (Status twitterStatus : statuses) {
String json = TwitterObjectFactory.getRawJSON(twitterStatus);
- if ( count < provider.getConfig().getMaxItems() ) {
+ if ( item_count < provider.getConfig().getMaxItems() ) {
try {
org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
} catch (Exception exception) {
LOGGER.warn("Failed to read document as Tweet ", twitterStatus);
}
- count++;
+ item_count++;
}
}
- paging.setPage(paging.getPage() + 1);
+ lastPage = statuses;
+ page_count = paging.getPage() + 1;
+ paging.setPage(page_count);
keepTrying = 10;
} catch (Exception ex) {
@@ -107,10 +112,18 @@ public class TwitterTimelineProviderTask implements Runnable {
}
}
}
- while (provider.shouldContinuePulling(statuses) && count < provider.getConfig().getMaxItems());
+ while (shouldContinuePulling());
LOGGER.info(id + " Thread Finished");
}
+ public boolean shouldContinuePulling() {
+ return (lastPage != null)
+ && item_count < provider.getConfig().getMaxItems()
+ && page_count <= provider.getConfig().getMaxPages();
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
index dda5d1b..89fc7af 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
@@ -13,6 +13,21 @@
"type": "boolean",
"description": "Whether to collect ids only, or full profiles",
"default": "true"
+ },
+ "max_items": {
+ "type": "integer",
+ "description": "Max items per user to collect",
+ "default": 50000
+ },
+ "max_pages": {
+ "type": "integer",
+ "description": "Max pages per user to request",
+ "default": 10
+ },
+ "page_size": {
+ "type": "integer",
+ "description": "Max items per page to request",
+ "default": 5000
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
new file mode 100644
index 0000000..37ed60e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterTimelineProviderConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
+ "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "max_items": {
+ "type": "integer",
+ "description": "Max items per user to collect",
+ "default": 3200
+ },
+ "max_pages": {
+ "type": "integer",
+ "description": "Max items per page to request",
+ "default": 16
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5e1bd10b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
index 01b1c26..405c87a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
@@ -16,10 +16,10 @@
"type": "string"
}
},
- "max_items": {
+ "page_size": {
"type": "integer",
- "description": "Max items per user to collect",
- "default": 5000
+ "description": "Max items per page to request",
+ "default": 200
}
}
}
\ No newline at end of file