You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:42 UTC
[4/8] incubator-streams git commit: fixes while testing flink examples
fixes while testing flink examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9495cf52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9495cf52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9495cf52
Branch: refs/heads/master
Commit: 9495cf52e3d1c5d5100566364bfa30447555682a
Parents: d9e58cd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:41:48 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:41:48 2016 -0500
----------------------------------------------------------------------
.../provider/TwitterFollowingProvider.java | 9 ++--
.../provider/TwitterFollowingProviderTask.java | 52 +++++++++-----------
.../provider/TwitterStreamProcessor.java | 5 +-
.../TwitterUserInformationProvider.java | 6 ++-
4 files changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 27c8526..4c3a828 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -88,7 +88,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
Twitter client = getTwitterClient();
for (int i = 0; i < ids.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
executor.submit(providerTask);
}
}
@@ -97,7 +97,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
Twitter client = getTwitterClient();
for (int i = 0; i < screenNames.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
executor.submit(providerTask);
}
@@ -106,7 +106,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
@Override
public StreamsResultSet readCurrent() {
- LOGGER.debug("Providing {} docs", providerQueue.size());
+ LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
StreamsResultSet result;
@@ -115,12 +115,13 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
+ LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
} finally {
lock.writeLock().unlock();
}
if (providerQueue.isEmpty() && executor.isTerminated()) {
- LOGGER.info("Finished. Cleaning up...");
+ LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
running.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index cc71d48..f2346fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -44,27 +44,20 @@ public class TwitterFollowingProviderTask implements Runnable {
protected TwitterFollowingProvider provider;
protected Twitter client;
protected Long id;
- protected Boolean idsOnly;
protected String screenName;
- protected String endpoint;
- private int max_per_page = 200;
int count = 0;
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint, Boolean idsOnly) {
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
this.provider = provider;
this.client = twitter;
this.id = id;
- this.endpoint = endpoint;
- this.idsOnly = idsOnly;
}
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint, Boolean idsOnly) {
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
this.provider = provider;
this.client = twitter;
this.screenName = screenName;
- this.endpoint = endpoint;
- this.idsOnly = idsOnly;
}
@@ -84,9 +77,9 @@ public class TwitterFollowingProviderTask implements Runnable {
protected void getFollowing(Long id) {
- Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+ Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
- if( idsOnly )
+ if( provider.getConfig().getIdsOnly() )
collectIds(id);
else
collectUsers(id);
@@ -112,10 +105,10 @@ public class TwitterFollowingProviderTask implements Runnable {
}
PagableResponseList<twitter4j.User> list = null;
- if( endpoint.equals("followers") )
- list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
- else if( endpoint.equals("friends") )
- list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+ if( provider.getConfig().getEndpoint().equals("followers") )
+ list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ else if( provider.getConfig().getEndpoint().equals("friends") )
+ list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
Preconditions.checkNotNull(list);
Preconditions.checkArgument(list.size() > 0);
@@ -126,11 +119,11 @@ public class TwitterFollowingProviderTask implements Runnable {
try {
Follow follow = null;
- if( endpoint.equals("followers") ) {
+ if( provider.getConfig().getEndpoint().equals("followers") ) {
follow = new Follow()
.withFollowee(mapper.readValue(userJson, User.class))
.withFollower(mapper.readValue(otherJson, User.class));
- } else if( endpoint.equals("friends") ) {
+ } else if( provider.getConfig().getEndpoint().equals("friends") ) {
follow = new Follow()
.withFollowee(mapper.readValue(otherJson, User.class))
.withFollower(mapper.readValue(userJson, User.class));
@@ -147,9 +140,9 @@ public class TwitterFollowingProviderTask implements Runnable {
LOGGER.warn("Exception: {}", e);
}
}
- if( list.size() == max_per_page )
- curser = list.getNextCursor();
- else break;
+ if( !list.hasNext() ) break;
+ if( list.getNextCursor() == 0 ) break;
+ curser = list.getNextCursor();
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -170,10 +163,10 @@ public class TwitterFollowingProviderTask implements Runnable {
try
{
twitter4j.IDs ids = null;
- if( endpoint.equals("followers") )
- ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, max_per_page);
- else if( endpoint.equals("friends") )
- ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, max_per_page);
+ if( provider.getConfig().getEndpoint().equals("followers") )
+ ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ else if( provider.getConfig().getEndpoint().equals("friends") )
+ ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
Preconditions.checkNotNull(ids);
Preconditions.checkArgument(ids.getIDs().length > 0);
@@ -182,16 +175,15 @@ public class TwitterFollowingProviderTask implements Runnable {
try {
Follow follow = null;
- if( endpoint.equals("followers") ) {
+ if( provider.getConfig().getEndpoint().equals("followers") ) {
follow = new Follow()
.withFollowee(new User().withId(id))
.withFollower(new User().withId(otherId));
- } else if( endpoint.equals("friends") ) {
+ } else if( provider.getConfig().getEndpoint().equals("friends") ) {
follow = new Follow()
.withFollowee(new User().withId(otherId))
.withFollower(new User().withId(id));
}
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
Preconditions.checkNotNull(follow);
@@ -203,9 +195,9 @@ public class TwitterFollowingProviderTask implements Runnable {
LOGGER.warn("Exception: {}", e);
}
}
- if( ids.hasNext() )
- curser = ids.getNextCursor();
- else break;
+ if( !ids.hasNext() ) break;
+ if( ids.getNextCursor() == 0 ) break;
+ curser = ids.getNextCursor();
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index f0690f8..8ea65eb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -88,8 +88,9 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
@Override
public List<StreamsDatum> call() throws Exception {
if(item != null) {
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
+ Class itemClass = TwitterEventClassifier.detectClass(item);
+ Object document = mapper.readValue(item, itemClass);
+ StreamsDatum rawDatum = new StreamsDatum(document);
return Lists.newArrayList(rawDatum);
}
return Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 78eb3e6..4231f56 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -197,7 +197,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
public StreamsResultSet readCurrent() {
- LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+ LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
StreamsResultSet result;
@@ -206,7 +206,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
- LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
} finally {
lock.writeLock().unlock();
}
@@ -215,6 +215,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
running.set(false);
+
+ LOGGER.info("Exiting");
}
return result;