You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/04/02 23:21:52 UTC

[1/9] incubator-streams git commit: STREAMS-235

Repository: incubator-streams
Updated Branches:
  refs/heads/master 2b994de99 -> a71653cff


STREAMS-235


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/43a3818f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/43a3818f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/43a3818f

Branch: refs/heads/master
Commit: 43a3818f59c6d0ca3c37e11b92160b52327cd903
Parents: 5082690
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 25 14:14:14 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 12:59:12 2015 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            |   1 +
 .../provider/TwitterFollowersProviderTask.java  | 120 ++++++++++++++++++
 .../provider/TwitterFollowingProvider.java      | 114 +++++++++++++++++
 .../provider/TwitterFriendsProviderTask.java    | 123 +++++++++++++++++++
 .../TwitterUserInformationProvider.java         |   7 +-
 .../TwitterFollowActivitySerializer.java        |  72 +++++++++++
 .../src/main/jsonschema/com/twitter/Follow.json |  14 +++
 7 files changed, 446 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 0f9d361..26211d7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -142,6 +142,7 @@
                         <sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/UserstreamEvent.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/twitter/Follow.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/FriendList.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
new file mode 100644
index 0000000..c0f8e8a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+import java.io.IOException;
+
+/**
+ *  Retrieve recent posts for a single user id.
+ */
+public class TwitterFollowersProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+    protected TwitterFollowingProvider provider;
+    protected Twitter client;
+    protected Long id;
+
+    public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+        this.provider = provider;
+        this.client = twitter;
+        this.id = id;
+    }
+
+    @Override
+    public void run() {
+
+        getFollowers(id);
+
+        LOGGER.info(id + " Thread Finished");
+
+    }
+
+    protected void getFollowers(Long id) {
+
+        int keepTrying = 0;
+
+        long paging = 1;
+
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 5)
+        {
+            try
+            {
+                twitter4j.User followee4j;
+                String followeeJson;
+                try {
+                    followee4j = client.users().showUser(id);
+                    followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+                } catch (TwitterException e) {
+                    LOGGER.error("Failure looking up " + id);
+                    break;
+                }
+
+                for (twitter4j.User follower4j : client.friendsFollowers().getFollowersList(id.longValue(), paging)) {
+
+                    String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+                    try {
+                        Follow follow = new Follow()
+                                .withFollowee(mapper.readValue(followeeJson, User.class))
+                                .withFollower(mapper.readValue(followerJson, User.class));
+
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+                    } catch (JsonParseException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (JsonMappingException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (IOException e) {
+                        LOGGER.warn(e.getMessage());
+                    }
+                    paging++;
+                    keepTrying = 10;
+                }
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
new file mode 100644
index 0000000..a4df278
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -0,0 +1,114 @@
+package org.apache.streams.twitter.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.Twitter;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Created by sblackmon on 11/25/14.
+ */
+public class TwitterFollowingProvider extends TwitterUserInformationProvider {
+
+    public static final String STREAMS_ID = "TwitterFollowingProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public static final int MAX_NUMBER_WAITING = 10000;
+
+    public TwitterFollowingProvider() {
+        super(TwitterConfigurator.detectTwitterUserInformationConfiguration(StreamsConfigurator.config.getConfig("twitter")));
+    }
+
+    public TwitterFollowingProvider(TwitterUserInformationConfiguration config) {
+        super(config);
+    }
+
+    @Override
+    public void startStream() {
+
+        running.set(true);
+
+        Preconditions.checkArgument(idsBatches.hasNext());
+
+        LOGGER.info("startStream");
+
+        while (idsBatches.hasNext()) {
+            submitFollowingThreads(idsBatches.next());
+        }
+
+        running.set(true);
+
+        executor.shutdown();
+
+    }
+
+    protected void submitFollowingThreads(Long[] ids) {
+        Twitter client = getTwitterClient();
+
+        if( getConfig().getEndpoint().equals("friends") ) {
+            for (int i = 0; i < ids.length; i++) {
+                TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, ids[i]);
+                executor.submit(providerTask);
+            }
+        } else if( getConfig().getEndpoint().equals("followers") ) {
+            for (int i = 0; i < ids.length; i++) {
+                TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, ids[i]);
+                executor.submit(providerTask);
+            }
+        }
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result;
+
+        try {
+            lock.writeLock().lock();
+            result = new StreamsResultSet(providerQueue);
+            result.setCounter(new DatumStatusCounter());
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (providerQueue.isEmpty() && executor.isTerminated()) {
+            LOGGER.info("Finished.  Cleaning up...");
+
+            running.set(false);
+
+            LOGGER.info("Exiting");
+        }
+
+        return result;
+
+    }
+
+    protected Queue<StreamsDatum> constructQueue() {
+        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+    }
+
+    @Override
+    public void prepare(Object o) {
+        super.prepare(o);
+        Preconditions.checkNotNull(getConfig().getEndpoint());
+        Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+        return;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
new file mode 100644
index 0000000..5d3a7d2
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+import twitter4j.User;
+
+import java.io.IOException;
+
+/**
+ *  Retrieve recent posts for a single user id.
+ */
+public class TwitterFriendsProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsProviderTask.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+    protected TwitterFollowingProvider provider;
+    protected Twitter client;
+    protected Long id;
+
+    public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+        this.provider = provider;
+        this.client = twitter;
+        this.id = id;
+    }
+
+    @Override
+    public void run() {
+
+        getFriends(id);
+
+        LOGGER.info(id + " Thread Finished");
+
+    }
+
+    protected void getFriends(Long id) {
+
+        int keepTrying = 0;
+
+        long curser = -1;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                twitter4j.User follower4j;
+                String followerJson;
+                try {
+                    follower4j = client.users().showUser(id);
+                    followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+                } catch (TwitterException e) {
+                    LOGGER.error("Failure looking up " + id);
+                    break;
+                }
+
+                PagableResponseList<User> followeeList = client.friendsFollowers().getFriendsList(id.longValue(), curser);
+
+                for (twitter4j.User followee4j : followeeList ) {
+
+                    String followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+
+                    try {
+                        Follow follow = new Follow()
+                                .withFollowee(mapper.readValue(followeeJson, org.apache.streams.twitter.pojo.User.class))
+                                .withFollower(mapper.readValue(followerJson, org.apache.streams.twitter.pojo.User.class));
+
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+                    } catch (JsonParseException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (JsonMappingException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (IOException e) {
+                        LOGGER.warn(e.getMessage());
+                    }
+                }
+                curser = followeeList.getNextCursor();
+                keepTrying = 10;
+
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index d5bf317..cad5aa8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -54,7 +54,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     public static final String STREAMS_ID = "TwitterUserInformationProvider";
     private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
 
-
     private TwitterUserInformationConfiguration twitterUserInformationConfiguration;
 
     private Class klass;
@@ -110,8 +109,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         running.set(true);
     }
 
-
-    private void loadBatch(Long[] ids) {
+    protected void loadBatch(Long[] ids) {
         Twitter client = getTwitterClient();
         int keepTrying = 0;
 
@@ -140,7 +138,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         }
     }
 
-    private void loadBatch(String[] ids) {
+    protected void loadBatch(String[] ids) {
         Twitter client = getTwitterClient();
         int keepTrying = 0;
 
@@ -234,7 +232,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
         Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
         Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey());
         Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret());
         Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
new file mode 100644
index 0000000..60da85a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
+
+public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
+
+    public TwitterFollowActivitySerializer() {}
+
+    private static TwitterFollowActivitySerializer instance = new TwitterFollowActivitySerializer();
+
+    public static TwitterFollowActivitySerializer getInstance() {
+        return instance;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Follow serialize(Activity deserialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(Follow event) throws ActivitySerializerException {
+
+        Activity activity = new Activity();
+        activity.setVerb("follow");
+        activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
+        activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Follow> serializedList) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
new file mode 100644
index 0000000..148038e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
@@ -0,0 +1,14 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.pojo.Follow",
+    "properties": {
+        "follower": {
+            "$ref": "User.json"
+        },
+        "followee": {
+            "$ref": "User.json"
+        }
+    }
+}
\ No newline at end of file


[7/9] incubator-streams git commit: fixes from integration testing with incubator-streams-examples/twitter-follow-graph

Posted by sb...@apache.org.
fixes from integration testing with incubator-streams-examples/twitter-follow-graph


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bf1cfaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bf1cfaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bf1cfaf

Branch: refs/heads/master
Commit: 8bf1cfaf05455c28df33eb825735555a61b2255a
Parents: 4bfbfc4
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Mar 27 10:00:52 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Mar 27 10:00:52 2015 -0500

----------------------------------------------------------------------
 .../TwitterFollowActivityConverter.java         |   5 +-
 .../provider/TwitterFollowingProvider.java      |   2 +-
 .../TwitterActivityConverterProcessorTest.java  | 115 ---------------
 .../test/TwitterActivityConvertersTest.java     | 139 ++++++-------------
 4 files changed, 48 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
index dedda69..e0ed4a4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Provider;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
 
@@ -64,7 +66,8 @@ public class TwitterFollowActivityConverter implements ActivityConverter<Follow>
         activity.setVerb("follow");
         activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
         activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
-
+        activity.setId(activity.getActor().getId() + "-follow->" + activity.getObject().getId());
+        activity.setProvider((Provider) new Provider().withId("twitter"));
         return Lists.newArrayList(activity);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 24dea0f..70c92c9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -112,7 +112,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     @Override
     public StreamsResultSet readCurrent() {
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        LOGGER.debug("Providing {} docs", providerQueue.size());
 
         StreamsResultSet result;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
deleted file mode 100644
index 61a3a6b..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.converter.ActivityConverterProcessorConfiguration;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterActivityConverterProcessorTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConverterProcessorTest.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
-    private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
 t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
 e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
 DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
 img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
-
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        InputStream is = TwitterActivityConverterProcessorTest.class.getResourceAsStream("/testtweets.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        ObjectNode event = null;
-        try {
-            event = (ObjectNode) mapper.readTree(TWITTER_JSON);
-        } catch (IOException e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-
-        assertThat(event, is(not(nullValue())));
-
-        Retweet retweet = mapper.convertValue(event, Retweet.class);
-
-        assertThat(retweet, is(not(nullValue())));
-        assertThat(retweet.getCreatedAt(), is(not(nullValue())));
-        assertThat(retweet.getText(), is(not(nullValue())));
-        assertThat(retweet.getUser(), is(not(nullValue())));
-        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
-
-        Activity activity = null;
-        try {
-            ActivityConverterProcessorConfiguration converterProcessorConfiguration =
-                    new ActivityConverterProcessorConfiguration()
-                            .withClassifiers((List) Lists.newArrayList(new TwitterDocumentClassifier()));
-            ActivityConverterProcessor converter = new ActivityConverterProcessor(converterProcessorConfiguration);
-            converter.prepare(converterProcessorConfiguration);
-            List<StreamsDatum> result = converter.process(new StreamsDatum(TWITTER_JSON));
-            activity = (Activity)result.get(0).getDocument();
-        } catch (Throwable e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-
-        assertThat(activity, is(not(nullValue())));
-
-        assertThat(activity.getId(), is(not(nullValue())));
-        assertThat(activity.getActor(), is(not(nullValue())));
-        assertThat(activity.getActor().getId(), is(not(nullValue())));
-        assertThat(activity.getVerb(), is(not(nullValue())));
-        assertThat(activity.getProvider(), is(not(nullValue())));
-        assertThat(activity.getObject(), is(not(nullValue())));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
index a8f5d86..d43cc27 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
@@ -19,28 +19,23 @@
 package org.apache.streams.twitter.test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.converter.TypeConverterUtil;
-import org.apache.streams.data.ActivityConverter;
+import com.google.common.collect.Lists;
+import org.apache.streams.converter.ActivityConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterJsonRetweetActivityConverter;
-import org.apache.streams.twitter.converter.TwitterJsonTweetActivityConverter;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.util.List;
 
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -54,96 +49,48 @@ public class TwitterActivityConvertersTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(StreamsTwitterMapper.TWITTER_FORMAT);
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
-    @Test
-    public void Tests()
-    {
-        InputStream is = TwitterActivityConvertersTest.class.getResourceAsStream("/testtweets.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        ActivityConverter activityConverter;
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if(!StringUtils.isEmpty(line))
-                {
-                    LOGGER.info("raw: {}", line);
-
-                    Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
-
-                    if( detected == Tweet.class ) {
-                        activityConverter = new TwitterJsonTweetActivityConverter();
-                    } else if( detected == Retweet.class ) {
-                        activityConverter = new TwitterJsonRetweetActivityConverter();
-                    } else {
-                        Assert.fail();
-                        return;
-                    }
-
-                    Object typedObject = TypeConverterUtil.convert(line, detected, mapper);
-
-                    Activity activity = (Activity) activityConverter.toActivityList(typedObject).get(0);
-
-                    String activitystring = mapper.writeValueAsString(activity);
-
-                    LOGGER.info("activity: {}", activitystring);
-
-                    assertThat(activity, is(not(nullValue())));
-
-                    assertThat(activity.getId(), is(not(nullValue())));
-                    assertThat(activity.getActor(), is(not(nullValue())));
-                    assertThat(activity.getActor().getId(), is(not(nullValue())));
-                    assertThat(activity.getVerb(), is(not(nullValue())));
-                    assertThat(activity.getProvider(), is(not(nullValue())));
-
-                    if( detected == Tweet.class ) {
-
-                        assertEquals(activity.getVerb(), "post");
-
-                        Tweet tweet = mapper.readValue(line, Tweet.class);
-
-                        if( tweet.getEntities() != null &&
-                            tweet.getEntities().getUrls() != null &&
-                            tweet.getEntities().getUrls().size() > 0 ) {
-
-
-                            assertThat(activity.getLinks(), is(not(nullValue())));
-                            assertEquals(tweet.getEntities().getUrls().size(), activity.getLinks().size());
-                        }
+    private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
 
-                    } else if( detected == Retweet.class ) {
+    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":fa
 lse,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors
 \":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682385797121,\"id_str\":\"410898682385797121\",\"text\":\"RT @hemocional: Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"163149656\",\"name\":\"Carolina\",\"screen_name\":\"_titinaok\",\"location\":\"Montevideo\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=N3v5vZ-tU1E\",\"description\":\"Tantas veces me defin\\u00ed ...Soy nada y todo a la vez\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":fal
 se,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/163149656\\/1379722210\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"pr
 ofile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":410898060206960640,\"id_str\":\"410898060206960640\",\"text\":\"Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"\\u003ca href=\\\"http:\\/\\/bufferapp.com\\\" rel=\\\"nofollow\\\"\\u003eBuffer\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":552929456,\"id_str\":\"552929456\",\"name\":\"Habilidad emocional\",\"screen_name\":\"hemocional\",\"location\":\"\",\"url\":\"http:\\/\\/www.hab
 ilidademocional.com\",\"description\":\"Pensamientos y reflexiones para ayudar a mirar la vida de una manera m\\u00e1s saludable y a crecer interiormente cada d\\u00eda m\\u00e1s. #InteligenciaEmocional #Psicolog\\u00eda\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg
 .com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/552929456\\/1383180255\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"he
 mocional\",\"name\":\"Habilidad emocional\",\"id\":552929456,\"id_str\":\"552929456\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+    private String delete = "{\"delete\":{\"status\":{\"id\":377518972486553600,\"user_id\":1249045572,\"id_str\":\"377518972486553600\",\"user_id_str\":\"1249045572\"}}}\n";
+    private String follow = "{\"follower\":{\"id\":1249045572},\"followee\":{\"id\":32386852}}\n";
 
-                        Retweet retweet = mapper.readValue(line, Retweet.class);
-
-                        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
-
-                        assertEquals(activity.getVerb(), "share");
-
-                        assertThat(activity.getObject(), is(not(nullValue())));
-                        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-                        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-
-                        if( retweet.getRetweetedStatus().getEntities() != null &&
-                            retweet.getRetweetedStatus().getEntities().getUrls() != null &&
-                            retweet.getRetweetedStatus().getEntities().getUrls().size() > 0 ) {
-
-                            assertThat(activity.getLinks(), is(not(nullValue())));
-                            assertEquals(retweet.getRetweetedStatus().getEntities().getUrls().size(), activity.getLinks().size());
-                        }
-
-                    }
+    @Test
+    public void testConvertTweet() {
+        List<Activity> activityList = activityConverterUtil.convert(tweet);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
 
+    @Test
+    public void testConvertRetweet() {
+        List<Activity> activityList = activityConverterUtil.convert(retweet);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
 
+    @Test
+    public void testConvertDelete() {
+        List<Activity> activityList = activityConverterUtil.convert(delete);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
 
-                }
-            }
-        } catch( Exception e ) {
-            System.out.println(e);
-            e.printStackTrace();
+    @Test
+    public void testConvertFollow() {
+        List<Activity> activityList = activityConverterUtil.convert(follow);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
             Assert.fail();
-        }
     }
 }


[3/9] incubator-streams git commit: loop logic

Posted by sb...@apache.org.
loop logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cd6d7c1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cd6d7c1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cd6d7c1a

Branch: refs/heads/master
Commit: cd6d7c1a11ff5a29a474b13fa8e8a0968d01c006
Parents: e5d5086
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 25 15:39:01 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 13:04:58 2015 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowersProviderTask.java     | 17 ++++++++---------
 .../provider/TwitterFriendsProviderTask.java       |  8 ++------
 2 files changed, 10 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cd6d7c1a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index 9f91267..03c0640 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -29,6 +29,7 @@ import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterObjectFactory;
@@ -67,12 +68,9 @@ public class TwitterFollowersProviderTask implements Runnable {
 
         int keepTrying = 0;
 
-        long paging = 1;
+        long curser = -1;
 
-
-        // keep trying to load, give it 5 attempts.
-        //while (keepTrying < 10)
-        while (keepTrying < 5)
+        do
         {
             try
             {
@@ -86,7 +84,9 @@ public class TwitterFollowersProviderTask implements Runnable {
                     break;
                 }
 
-                for (twitter4j.User follower4j : client.friendsFollowers().getFollowersList(id.longValue(), paging)) {
+                PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(id.longValue(), curser);
+
+                for (twitter4j.User follower4j : followerList) {
 
                     String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
 
@@ -103,9 +103,8 @@ public class TwitterFollowersProviderTask implements Runnable {
                     } catch (IOException e) {
                         LOGGER.warn(e.getMessage());
                     }
-                    paging++;
-                    keepTrying = 10;
                 }
+                curser = followerList.getNextCursor();
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -113,7 +112,7 @@ public class TwitterFollowersProviderTask implements Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
             }
-        }
+        } while (curser != 0 && keepTrying < 10);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cd6d7c1a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 408a614..434e208 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -70,9 +70,7 @@ public class TwitterFriendsProviderTask implements Runnable {
 
         long curser = -1;
 
-        // keep trying to load, give it 5 attempts.
-        //while (keepTrying < 10)
-        while (keepTrying < 1)
+        do
         {
             try
             {
@@ -107,8 +105,6 @@ public class TwitterFriendsProviderTask implements Runnable {
                     }
                 }
                 curser = followeeList.getNextCursor();
-                keepTrying = 10;
-
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -116,7 +112,7 @@ public class TwitterFriendsProviderTask implements Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
             }
-        }
+        } while (curser != 0 && keepTrying < 10);
     }
 
 }


[4/9] incubator-streams git commit: screen name support

Posted by sb...@apache.org.
screen name support


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3dceeb5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3dceeb5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3dceeb5e

Branch: refs/heads/master
Commit: 3dceeb5efdb260e1b3c4f7cf15b4e38ccb23df8c
Parents: cd6d7c1
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 27 12:32:13 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:14:14 2015 -0500

----------------------------------------------------------------------
 .../TwitterFollowActivityConverter.java         | 81 ++++++++++++++++++++
 .../provider/TwitterFollowersProviderTask.java  | 67 +++++++++++++++-
 .../provider/TwitterFollowingProvider.java      | 21 ++++-
 .../provider/TwitterFriendsProviderTask.java    | 67 +++++++++++++++-
 .../TwitterFollowActivitySerializer.java        | 70 -----------------
 5 files changed, 231 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
new file mode 100644
index 0000000..dedda69
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.converter;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TwitterFollowActivityConverter implements ActivityConverter<Follow>, Serializable {
+
+    public TwitterFollowActivityConverter() {
+    }
+
+    private static TwitterFollowActivityConverter instance = new TwitterFollowActivityConverter();
+
+    public static TwitterFollowActivityConverter getInstance() {
+        return instance;
+    }
+
+    public static Class requiredClass = Follow.class;
+
+    @Override
+    public Class requiredClass() {
+        return requiredClass;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Follow fromActivity(Activity deserialized) throws ActivityConversionException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public List<Activity> toActivityList(Follow event) throws ActivityConversionException {
+
+        Activity activity = new Activity();
+        activity.setVerb("follow");
+        activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
+        activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
+
+        return Lists.newArrayList(activity);
+    }
+
+    @Override
+    public List<Follow> fromActivityList(List<Activity> list) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public List<Activity> toActivityList(List<Follow> list) {
+        throw new NotImplementedException();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index 03c0640..cb679a6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -48,6 +48,7 @@ public class TwitterFollowersProviderTask implements Runnable {
     protected TwitterFollowingProvider provider;
     protected Twitter client;
     protected Long id;
+    protected String screenName;
 
     public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
         this.provider = provider;
@@ -55,12 +56,22 @@ public class TwitterFollowersProviderTask implements Runnable {
         this.id = id;
     }
 
+    public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+        this.provider = provider;
+        this.client = twitter;
+        this.screenName = screenName;
+    }
+
+
     @Override
     public void run() {
 
-        getFollowers(id);
+        if( id != null )
+            getFollowers(id);
+        if( screenName != null)
+            getFollowers(screenName);
 
-        LOGGER.info(id + " Thread Finished");
+        LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
 
     }
 
@@ -115,4 +126,56 @@ public class TwitterFollowersProviderTask implements Runnable {
         } while (curser != 0 && keepTrying < 10);
     }
 
+    protected void getFollowers(String screenName) {
+
+        int keepTrying = 0;
+
+        long curser = -1;
+
+        do
+        {
+            try
+            {
+                twitter4j.User followee4j;
+                String followeeJson;
+                try {
+                    followee4j = client.users().showUser(screenName);
+                    followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+                } catch (TwitterException e) {
+                    LOGGER.error("Failure looking up " + screenName);
+                    break;
+                }
+
+                PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(screenName, curser);
+
+                for (twitter4j.User follower4j : followerList) {
+
+                    String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+                    try {
+                        Follow follow = new Follow()
+                                .withFollowee(mapper.readValue(followeeJson, User.class))
+                                .withFollower(mapper.readValue(followerJson, User.class));
+
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+                    } catch (JsonParseException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (JsonMappingException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (IOException e) {
+                        LOGGER.warn(e.getMessage());
+                    }
+                }
+                curser = followerList.getNextCursor();
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        } while (curser != 0 && keepTrying < 10);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 0a62fc8..24dea0f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -60,13 +60,16 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
         running.set(true);
 
-        Preconditions.checkArgument(idsBatches.hasNext());
+        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
         LOGGER.info("startStream");
 
         while (idsBatches.hasNext()) {
             submitFollowingThreads(idsBatches.next());
         }
+        while (screenNameBatches.hasNext()) {
+            submitFollowingThreads(screenNameBatches.next());
+        }
 
         running.set(true);
 
@@ -90,6 +93,22 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
         }
     }
 
+    protected void submitFollowingThreads(String[] screenNames) {
+        Twitter client = getTwitterClient();
+
+        if( getConfig().getEndpoint().equals("friends") ) {
+            for (int i = 0; i < screenNames.length; i++) {
+                TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, screenNames[i]);
+                executor.submit(providerTask);
+            }
+        } else if( getConfig().getEndpoint().equals("followers") ) {
+            for (int i = 0; i < screenNames.length; i++) {
+                TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, screenNames[i]);
+                executor.submit(providerTask);
+            }
+        }
+    }
+
     @Override
     public StreamsResultSet readCurrent() {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 434e208..c5ababe 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -48,6 +48,7 @@ public class TwitterFriendsProviderTask implements Runnable {
     protected TwitterFollowingProvider provider;
     protected Twitter client;
     protected Long id;
+    protected String screenName;
 
     public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
         this.provider = provider;
@@ -55,12 +56,22 @@ public class TwitterFriendsProviderTask implements Runnable {
         this.id = id;
     }
 
+    public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+        this.provider = provider;
+        this.client = twitter;
+        this.screenName = screenName;
+    }
+
+
     @Override
     public void run() {
 
-        getFriends(id);
+        if( id != null )
+            getFriends(id);
+        if( screenName != null)
+            getFriends(screenName);
 
-        LOGGER.info(id + " Thread Finished");
+        LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
 
     }
 
@@ -115,4 +126,56 @@ public class TwitterFriendsProviderTask implements Runnable {
         } while (curser != 0 && keepTrying < 10);
     }
 
+    protected void getFriends(String screenName) {
+
+        int keepTrying = 0;
+
+        long curser = -1;
+
+        do
+        {
+            try
+            {
+                twitter4j.User follower4j;
+                String followerJson;
+                try {
+                    follower4j = client.users().showUser(screenName);
+                    followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+                } catch (TwitterException e) {
+                    LOGGER.error("Failure looking up " + screenName);
+                    break;
+                }
+
+                PagableResponseList<User> followeeList = client.friendsFollowers().getFriendsList(screenName, curser);
+
+                for (twitter4j.User followee4j : followeeList ) {
+
+                    String followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+
+                    try {
+                        Follow follow = new Follow()
+                                .withFollowee(mapper.readValue(followeeJson, org.apache.streams.twitter.pojo.User.class))
+                                .withFollower(mapper.readValue(followerJson, org.apache.streams.twitter.pojo.User.class));
+
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+                    } catch (JsonParseException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (JsonMappingException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (IOException e) {
+                        LOGGER.warn(e.getMessage());
+                    }
+                }
+                curser = followeeList.getNextCursor();
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        } while (curser != 0 && keepTrying < 10);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
deleted file mode 100644
index 4dd29cc..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
-
-    public TwitterFollowActivitySerializer() {}
-
-    private static TwitterFollowActivitySerializer instance = new TwitterFollowActivitySerializer();
-
-    public static TwitterFollowActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public Follow serialize(Activity deserialized) throws ActivitySerializerException {
-        return null;
-    }
-
-    @Override
-    public Activity deserialize(Follow event) throws ActivitySerializerException {
-
-        Activity activity = new Activity();
-        activity.setVerb("follow");
-        activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
-        activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Follow> serializedList) {
-        return null;
-    }
-}


[9/9] incubator-streams git commit: Merge branch 'STREAMS-235' into asf-master

Posted by sb...@apache.org.
Merge branch 'STREAMS-235' into asf-master


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a71653cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a71653cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a71653cf

Branch: refs/heads/master
Commit: a71653cff8d466928dc8fbfde4923a4cb92c4a76
Parents: 2b994de 835539a
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 2 16:21:26 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 2 16:21:26 2015 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            |   3 +-
 .../converter/TwitterDocumentClassifier.java    |  12 +-
 .../TwitterFollowActivityConverter.java         |  84 +++++++++
 .../provider/TwitterFollowingProvider.java      | 137 ++++++++++++++
 .../provider/TwitterFollowingProviderTask.java  | 160 ++++++++++++++++
 .../provider/TwitterFriendsProviderTask.java    | 181 +++++++++++++++++++
 .../TwitterUserInformationProvider.java         |   7 +-
 .../src/main/jsonschema/com/twitter/Follow.json |  14 ++
 .../TwitterActivityConverterProcessorTest.java  | 115 ------------
 .../test/TwitterActivityConvertersTest.java     | 139 +++++---------
 .../test/TwitterDocumentClassifierTest.java     |  11 ++
 11 files changed, 643 insertions(+), 220 deletions(-)
----------------------------------------------------------------------



[6/9] incubator-streams git commit: follow detection test

Posted by sb...@apache.org.
follow detection test


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bfbfc40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bfbfc40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bfbfc40

Branch: refs/heads/master
Commit: 4bfbfc400e94c27c847cb2efb670831c80d0a9d9
Parents: c14e335
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 20:29:11 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:29:11 2015 -0500

----------------------------------------------------------------------
 .../twitter/converter/TwitterDocumentClassifier.java     | 10 +++++++---
 .../twitter/test/TwitterDocumentClassifierTest.java      | 11 +++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bfbfc40/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index bdd6682..7c8ed8c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -24,9 +24,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Follow;
-import org.apache.streams.twitter.pojo.*;
-import sun.reflect.generics.reflectiveObjects.LazyReflectiveObjectGenerator;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
 
 import java.io.IOException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bfbfc40/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
index 324a605..a55bd09 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
@@ -21,6 +21,7 @@ package org.apache.streams.twitter.test;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
@@ -42,6 +43,7 @@ public class TwitterDocumentClassifierTest {
     private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":fa
 lse,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors
 \":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
     private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682385797121,\"id_str\":\"410898682385797121\",\"text\":\"RT @hemocional: Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"163149656\",\"name\":\"Carolina\",\"screen_name\":\"_titinaok\",\"location\":\"Montevideo\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=N3v5vZ-tU1E\",\"description\":\"Tantas veces me defin\\u00ed ...Soy nada y todo a la vez\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":fal
 se,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/163149656\\/1379722210\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"pr
 ofile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":410898060206960640,\"id_str\":\"410898060206960640\",\"text\":\"Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"\\u003ca href=\\\"http:\\/\\/bufferapp.com\\\" rel=\\\"nofollow\\\"\\u003eBuffer\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":552929456,\"id_str\":\"552929456\",\"name\":\"Habilidad emocional\",\"screen_name\":\"hemocional\",\"location\":\"\",\"url\":\"http:\\/\\/www.hab
 ilidademocional.com\",\"description\":\"Pensamientos y reflexiones para ayudar a mirar la vida de una manera m\\u00e1s saludable y a crecer interiormente cada d\\u00eda m\\u00e1s. #InteligenciaEmocional #Psicolog\\u00eda\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg
 .com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/552929456\\/1383180255\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"he
 mocional\",\"name\":\"Habilidad emocional\",\"id\":552929456,\"id_str\":\"552929456\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
     private String delete = "{\"delete\":{\"status\":{\"id\":377518972486553600,\"user_id\":1249045572,\"id_str\":\"377518972486553600\",\"user_id_str\":\"1249045572\"}}}\n";
+    private String follow = "{\"follower\":{\"id\":1249045572},\"followee\":{\"id\":32386852}}\n";
     private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":32386852,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"Fred Gilkey\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_
 id_str\":null,\"id\":1541596700,\"source\":\"web\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"1541596700\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"4TYLove\",\"id_str\":\"32386852\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
 
     @Test
@@ -72,6 +74,15 @@ public class TwitterDocumentClassifierTest {
     }
 
     @Test
+    public void testDetectFollow() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if( !result.equals(Follow.class) )
+            Assert.fail();
+    }
+
+    @Test
     public void testDetectUser() {
         List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
         Assert.assertTrue(detected.size() == 1);


[2/9] incubator-streams git commit: cleanup post-277

Posted by sb...@apache.org.
cleanup post-277


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e5d50866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e5d50866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e5d50866

Branch: refs/heads/master
Commit: e5d50866be407e9a25ecf9936c0a16631d01ee2b
Parents: 43a3818
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 13:04:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 13:04:23 2015 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowersProviderTask.java    |  3 +--
 .../provider/TwitterFollowingProvider.java        | 18 ++++++++++++++++++
 .../provider/TwitterFriendsProviderTask.java      |  3 +--
 .../TwitterFollowActivitySerializer.java          |  4 +---
 4 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index c0f8e8a..9f91267 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -26,7 +26,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class TwitterFollowersProviderTask implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
 
-    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected TwitterFollowingProvider provider;
     protected Twitter client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index a4df278..0a62fc8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 5d3a7d2..408a614 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ public class TwitterFriendsProviderTask implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsProviderTask.class);
 
-    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected TwitterFollowingProvider provider;
     protected Twitter client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
index 60da85a..4dd29cc 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
@@ -26,14 +26,12 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
-
 public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
 
     public TwitterFollowActivitySerializer() {}


[5/9] incubator-streams git commit: follow Json detection

Posted by sb...@apache.org.
follow Json detection


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c14e335e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c14e335e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c14e335e

Branch: refs/heads/master
Commit: c14e335ede2d05c1265af609fbbfea196001802f
Parents: 3dceeb5
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 20:14:28 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:14:28 2015 -0500

----------------------------------------------------------------------
 .../streams/twitter/converter/TwitterDocumentClassifier.java       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c14e335e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index 9eda110..bdd6682 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -68,6 +68,8 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
             classList.add(FriendList.class);
         else if( objectNode.findValue("target_object") != null )
             classList.add(UserstreamEvent.class);
+        else if( objectNode.findValue("follower") != null && objectNode.findValue("followee") != null)
+            classList.add(Follow.class);
         else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
             classList.add(User.class);
         else


[8/9] incubator-streams git commit: PR feedback for https://github.com/apache/incubator-streams/pull/202

Posted by sb...@apache.org.
PR feedback for https://github.com/apache/incubator-streams/pull/202

Signed-off-by: Steve Blackmon (@steveblackmon) <sb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/835539af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/835539af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/835539af

Branch: refs/heads/master
Commit: 835539af9b791f1a0f7bf968bcb0507b75a63304
Parents: 8bf1cfa
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 2 16:08:24 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 2 16:08:24 2015 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            |   2 +-
 .../provider/TwitterFollowersProviderTask.java  | 181 -------------------
 .../provider/TwitterFollowingProvider.java      |  28 +--
 .../provider/TwitterFollowingProviderTask.java  | 160 ++++++++++++++++
 4 files changed, 168 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 26211d7..acdbf4a 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -88,7 +88,7 @@
         <dependency>
             <groupId>org.twitter4j</groupId>
             <artifactId>twitter4j-core</artifactId>
-            <version>4.0.1</version>
+            <version>4.0.3</version>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
deleted file mode 100644
index cb679a6..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.PagableResponseList;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterObjectFactory;
-
-import java.io.IOException;
-
-/**
- *  Retrieve recent posts for a single user id.
- */
-public class TwitterFollowersProviderTask implements Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
-
-    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected TwitterFollowingProvider provider;
-    protected Twitter client;
-    protected Long id;
-    protected String screenName;
-
-    public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
-        this.provider = provider;
-        this.client = twitter;
-        this.id = id;
-    }
-
-    public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
-        this.provider = provider;
-        this.client = twitter;
-        this.screenName = screenName;
-    }
-
-
-    @Override
-    public void run() {
-
-        if( id != null )
-            getFollowers(id);
-        if( screenName != null)
-            getFollowers(screenName);
-
-        LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
-
-    }
-
-    protected void getFollowers(Long id) {
-
-        int keepTrying = 0;
-
-        long curser = -1;
-
-        do
-        {
-            try
-            {
-                twitter4j.User followee4j;
-                String followeeJson;
-                try {
-                    followee4j = client.users().showUser(id);
-                    followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
-                } catch (TwitterException e) {
-                    LOGGER.error("Failure looking up " + id);
-                    break;
-                }
-
-                PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(id.longValue(), curser);
-
-                for (twitter4j.User follower4j : followerList) {
-
-                    String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
-
-                    try {
-                        Follow follow = new Follow()
-                                .withFollowee(mapper.readValue(followeeJson, User.class))
-                                .withFollower(mapper.readValue(followerJson, User.class));
-
-                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-                    } catch (JsonParseException e) {
-                        LOGGER.warn(e.getMessage());
-                    } catch (JsonMappingException e) {
-                        LOGGER.warn(e.getMessage());
-                    } catch (IOException e) {
-                        LOGGER.warn(e.getMessage());
-                    }
-                }
-                curser = followerList.getNextCursor();
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-            }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-            }
-        } while (curser != 0 && keepTrying < 10);
-    }
-
-    protected void getFollowers(String screenName) {
-
-        int keepTrying = 0;
-
-        long curser = -1;
-
-        do
-        {
-            try
-            {
-                twitter4j.User followee4j;
-                String followeeJson;
-                try {
-                    followee4j = client.users().showUser(screenName);
-                    followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
-                } catch (TwitterException e) {
-                    LOGGER.error("Failure looking up " + screenName);
-                    break;
-                }
-
-                PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(screenName, curser);
-
-                for (twitter4j.User follower4j : followerList) {
-
-                    String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
-
-                    try {
-                        Follow follow = new Follow()
-                                .withFollowee(mapper.readValue(followeeJson, User.class))
-                                .withFollower(mapper.readValue(followerJson, User.class));
-
-                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-                    } catch (JsonParseException e) {
-                        LOGGER.warn(e.getMessage());
-                    } catch (JsonMappingException e) {
-                        LOGGER.warn(e.getMessage());
-                    } catch (IOException e) {
-                        LOGGER.warn(e.getMessage());
-                    }
-                }
-                curser = followerList.getNextCursor();
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-            }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-            }
-        } while (curser != 0 && keepTrying < 10);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 70c92c9..272d4b7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -20,7 +20,6 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -80,33 +79,20 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     protected void submitFollowingThreads(Long[] ids) {
         Twitter client = getTwitterClient();
 
-        if( getConfig().getEndpoint().equals("friends") ) {
-            for (int i = 0; i < ids.length; i++) {
-                TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, ids[i]);
-                executor.submit(providerTask);
-            }
-        } else if( getConfig().getEndpoint().equals("followers") ) {
-            for (int i = 0; i < ids.length; i++) {
-                TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, ids[i]);
-                executor.submit(providerTask);
-            }
+        for (int i = 0; i < ids.length; i++) {
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint());
+            executor.submit(providerTask);
         }
     }
 
     protected void submitFollowingThreads(String[] screenNames) {
         Twitter client = getTwitterClient();
 
-        if( getConfig().getEndpoint().equals("friends") ) {
-            for (int i = 0; i < screenNames.length; i++) {
-                TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, screenNames[i]);
-                executor.submit(providerTask);
-            }
-        } else if( getConfig().getEndpoint().equals("followers") ) {
-            for (int i = 0; i < screenNames.length; i++) {
-                TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, screenNames[i]);
-                executor.submit(providerTask);
-            }
+        for (int i = 0; i < screenNames.length; i++) {
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint());
+            executor.submit(providerTask);
         }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
new file mode 100644
index 0000000..ea737c5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+import java.io.IOException;
+
+/**
+ *  Retrieve recent posts for a single user id.
+ */
+public class TwitterFollowingProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected TwitterFollowingProvider provider;
+    protected Twitter client;
+    protected Long id;
+    protected String screenName;
+    protected String endpoint;
+
+    private int max_per_page = 200;
+
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint) {
+        this.provider = provider;
+        this.client = twitter;
+        this.id = id;
+        this.endpoint = endpoint;
+    }
+
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint) {
+        this.provider = provider;
+        this.client = twitter;
+        this.screenName = screenName;
+        this.endpoint = endpoint;
+    }
+
+
+    @Override
+    public void run() {
+
+        Preconditions.checkArgument(id != null || screenName != null);
+
+        if( id != null )
+            getFollowing(id);
+        else if( screenName != null)
+            getFollowing(screenName);
+
+        LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
+
+    }
+
+    protected void getFollowing(Long id) {
+
+        Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+
+        int keepTrying = 0;
+
+        long curser = -1;
+
+        do
+        {
+            try
+            {
+                twitter4j.User followee4j;
+                String followeeJson;
+                try {
+                    followee4j = client.users().showUser(id);
+                    followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+                } catch (TwitterException e) {
+                    LOGGER.error("Failure looking up " + id);
+                    break;
+                }
+
+                PagableResponseList<twitter4j.User> list = null;
+                if( endpoint.equals("followers") )
+                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
+                else if( endpoint.equals("friends") )
+                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+
+                Preconditions.checkNotNull(list);
+                Preconditions.checkArgument(list.size() > 0);
+
+                for (twitter4j.User follower4j : list) {
+
+                    String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+                    try {
+                        Follow follow = new Follow()
+                                .withFollowee(mapper.readValue(followeeJson, User.class))
+                                .withFollower(mapper.readValue(followerJson, User.class));
+
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+                    } catch (JsonParseException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (JsonMappingException e) {
+                        LOGGER.warn(e.getMessage());
+                    } catch (IOException e) {
+                        LOGGER.warn(e.getMessage());
+                    }
+                }
+                if( list.size() == max_per_page )
+                    curser = list.getNextCursor();
+                else break;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        } while (curser != 0 && keepTrying < 10);
+    }
+
+    protected void getFollowing(String screenName) {
+
+        twitter4j.User user = null;
+        try {
+            user = client.users().showUser(screenName);
+        } catch (TwitterException e) {
+            LOGGER.error("Failure looking up " + id);
+        }
+        Preconditions.checkNotNull(user);
+        getFollowing(user.getId());
+    }
+
+
+}