You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:42 UTC

[4/8] incubator-streams git commit: fixes while testing flink examples

fixes while testing flink examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9495cf52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9495cf52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9495cf52

Branch: refs/heads/master
Commit: 9495cf52e3d1c5d5100566364bfa30447555682a
Parents: d9e58cd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:41:48 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:41:48 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowingProvider.java      |  9 ++--
 .../provider/TwitterFollowingProviderTask.java  | 52 +++++++++-----------
 .../provider/TwitterStreamProcessor.java        |  5 +-
 .../TwitterUserInformationProvider.java         |  6 ++-
 4 files changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 27c8526..4c3a828 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -88,7 +88,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
         Twitter client = getTwitterClient();
 
         for (int i = 0; i < ids.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
             executor.submit(providerTask);
         }
     }
@@ -97,7 +97,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
         Twitter client = getTwitterClient();
 
         for (int i = 0; i < screenNames.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
             executor.submit(providerTask);
         }
 
@@ -106,7 +106,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     @Override
     public StreamsResultSet readCurrent() {
 
-        LOGGER.debug("Providing {} docs", providerQueue.size());
+        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
 
         StreamsResultSet result;
 
@@ -115,12 +115,13 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
             result = new StreamsResultSet(providerQueue);
             result.setCounter(new DatumStatusCounter());
             providerQueue = constructQueue();
+            LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
         } finally {
             lock.writeLock().unlock();
         }
 
         if (providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("Finished.  Cleaning up...");
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
             running.set(false);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index cc71d48..f2346fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -44,27 +44,20 @@ public class TwitterFollowingProviderTask implements Runnable {
     protected TwitterFollowingProvider provider;
     protected Twitter client;
     protected Long id;
-    protected Boolean idsOnly;
     protected String screenName;
-    protected String endpoint;
 
-    private int max_per_page = 200;
     int count = 0;
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint, Boolean idsOnly) {
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
         this.provider = provider;
         this.client = twitter;
         this.id = id;
-        this.endpoint = endpoint;
-        this.idsOnly = idsOnly;
     }
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint, Boolean idsOnly) {
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
         this.provider = provider;
         this.client = twitter;
         this.screenName = screenName;
-        this.endpoint = endpoint;
-        this.idsOnly = idsOnly;
     }
 
 
@@ -84,9 +77,9 @@ public class TwitterFollowingProviderTask implements Runnable {
 
     protected void getFollowing(Long id) {
 
-        Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+        Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
 
-        if( idsOnly )
+        if( provider.getConfig().getIdsOnly() )
             collectIds(id);
         else
             collectUsers(id);
@@ -112,10 +105,10 @@ public class TwitterFollowingProviderTask implements Runnable {
                 }
 
                 PagableResponseList<twitter4j.User> list = null;
-                if( endpoint.equals("followers") )
-                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
-                else if( endpoint.equals("friends") )
-                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+                if( provider.getConfig().getEndpoint().equals("followers") )
+                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+                else if( provider.getConfig().getEndpoint().equals("friends") )
+                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
 
                 Preconditions.checkNotNull(list);
                 Preconditions.checkArgument(list.size() > 0);
@@ -126,11 +119,11 @@ public class TwitterFollowingProviderTask implements Runnable {
 
                     try {
                         Follow follow = null;
-                        if( endpoint.equals("followers") ) {
+                        if( provider.getConfig().getEndpoint().equals("followers") ) {
                             follow = new Follow()
                                     .withFollowee(mapper.readValue(userJson, User.class))
                                     .withFollower(mapper.readValue(otherJson, User.class));
-                        } else if( endpoint.equals("friends") ) {
+                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
                             follow = new Follow()
                                     .withFollowee(mapper.readValue(otherJson, User.class))
                                     .withFollower(mapper.readValue(userJson, User.class));
@@ -147,9 +140,9 @@ public class TwitterFollowingProviderTask implements Runnable {
                         LOGGER.warn("Exception: {}", e);
                     }
                 }
-                if( list.size() == max_per_page )
-                    curser = list.getNextCursor();
-                else break;
+                if( !list.hasNext() ) break;
+                if( list.getNextCursor() == 0 ) break;
+                curser = list.getNextCursor();
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -170,10 +163,10 @@ public class TwitterFollowingProviderTask implements Runnable {
             try
             {
                 twitter4j.IDs ids = null;
-                if( endpoint.equals("followers") )
-                    ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, max_per_page);
-                else if( endpoint.equals("friends") )
-                    ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, max_per_page);
+                if( provider.getConfig().getEndpoint().equals("followers") )
+                    ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+                else if( provider.getConfig().getEndpoint().equals("friends") )
+                    ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
 
                 Preconditions.checkNotNull(ids);
                 Preconditions.checkArgument(ids.getIDs().length > 0);
@@ -182,16 +175,15 @@ public class TwitterFollowingProviderTask implements Runnable {
 
                     try {
                         Follow follow = null;
-                        if( endpoint.equals("followers") ) {
+                        if( provider.getConfig().getEndpoint().equals("followers") ) {
                             follow = new Follow()
                                     .withFollowee(new User().withId(id))
                                     .withFollower(new User().withId(otherId));
-                        } else if( endpoint.equals("friends") ) {
+                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
                             follow = new Follow()
                                     .withFollowee(new User().withId(otherId))
                                     .withFollower(new User().withId(id));
                         }
-                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
 
                         Preconditions.checkNotNull(follow);
 
@@ -203,9 +195,9 @@ public class TwitterFollowingProviderTask implements Runnable {
                         LOGGER.warn("Exception: {}", e);
                     }
                 }
-                if( ids.hasNext() )
-                    curser = ids.getNextCursor();
-                else break;
+                if( !ids.hasNext() ) break;
+                if( ids.getNextCursor() == 0 ) break;
+                curser = ids.getNextCursor();
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index f0690f8..8ea65eb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -88,8 +88,9 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
         @Override
         public List<StreamsDatum> call() throws Exception {
             if(item != null) {
-                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-                StreamsDatum rawDatum = new StreamsDatum(objectNode);
+                Class itemClass = TwitterEventClassifier.detectClass(item);
+                Object document = mapper.readValue(item, itemClass);
+                StreamsDatum rawDatum = new StreamsDatum(document);
                 return Lists.newArrayList(rawDatum);
             }
             return Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 78eb3e6..4231f56 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -197,7 +197,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     public StreamsResultSet readCurrent() {
 
-        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+        LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
 
         StreamsResultSet result;
 
@@ -206,7 +206,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             result = new StreamsResultSet(providerQueue);
             result.setCounter(new DatumStatusCounter());
             providerQueue = constructQueue();
-            LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+            LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
         } finally {
             lock.writeLock().unlock();
         }
@@ -215,6 +215,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
             running.set(false);
+
+            LOGGER.info("Exiting");
         }
 
         return result;