You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:39 UTC

[1/8] incubator-streams git commit: related to STREAMS-403

Repository: incubator-streams
Updated Branches:
  refs/heads/master 8bb4ca8a6 -> 4febde277


related to STREAMS-403


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9bf8ef9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9bf8ef9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9bf8ef9b

Branch: refs/heads/master
Commit: 9bf8ef9ba566351a855366875f0253059c0473ed
Parents: 8bb4ca8
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 15:06:14 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 15:11:44 2016 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterErrorHandler.java   |  28 +++--
 .../provider/TwitterFollowingProvider.java      |   9 +-
 .../provider/TwitterFollowingProviderTask.java  |   8 +-
 .../provider/TwitterTimelineProvider.java       |   8 +-
 .../provider/TwitterTimelineProviderTask.java   |  20 +++-
 .../TwitterUserInformationProvider.java         | 103 ++++++++++++++-----
 .../src/main/jsonschema/com/twitter/Follow.json |   1 +
 .../com/twitter/TwitterConfiguration.json       |  10 ++
 8 files changed, 142 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 51236ba..90f6b62 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -18,6 +18,9 @@
 
 package org.apache.streams.twitter.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
@@ -32,11 +35,18 @@ public class TwitterErrorHandler
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
 
     // selected because 3 * 5 + n >= 15 for positive n
-    protected static final long retry = 3*60*1000;
+    protected static long retry =
+            new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+                    StreamsConfigurator.getConfig().getConfig("twitter")
+            ).getRetrySleepMs();
+    protected static long retryMax =
+            new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+                    StreamsConfigurator.getConfig().getConfig("twitter")
+            ).getRetryMax();
 
     @Deprecated
     public static int handleTwitterError(Twitter twitter, Exception exception) {
-        return handleTwitterError(twitter, null, exception);
+        return handleTwitterError( twitter, null, exception);
     }
 
     public static int handleTwitterError(Twitter twitter, Long id, Exception exception)
@@ -82,11 +92,11 @@ public class TwitterErrorHandler
                         LOGGER.warn("User does not exist: {}", id);
                     else
                         LOGGER.warn("User does not exist");
-                    return 100;
+                    return (int)retryMax;
                 }
                 else
                 {
-                    return 1;
+                    return (int)retryMax/3;
                 }
             }
             else
@@ -94,7 +104,7 @@ public class TwitterErrorHandler
                 if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
                 {
                     // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
-                    return 5;
+                    return (int)retryMax/3;
                 }
                 else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) {
                     // This is a 401 reflecting credentials don't have access to the requested resource.
@@ -102,7 +112,7 @@ public class TwitterErrorHandler
                         LOGGER.warn("Authentication Exception accessing id: {}", id);
                     else
                         LOGGER.warn("Authentication Exception");
-                    return 5;
+                    return (int)retryMax;
                 }
                 else
                 {
@@ -111,19 +121,19 @@ public class TwitterErrorHandler
                     LOGGER.warn("   Access: {}", e.getAccessLevel());
                     LOGGER.warn("     Code: {}", e.getExceptionCode());
                     LOGGER.warn("  Message: {}", e.getLocalizedMessage());
-                    return 1;
+                    return (int)retryMax/10;
                 }
             }
         }
         else if(exception instanceof RuntimeException)
         {
             LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
-            return 1;
+            return (int)retryMax/3;
         }
         else
         {
             LOGGER.info("Completely Unknown Exception: {}", exception);
-            return 1;
+            return (int)retryMax/3;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index dc15407..27c8526 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,6 +58,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     }
 
     public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
+        super(config);
         this.config = config;
     }
 
@@ -130,7 +132,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     }
 
     protected Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+        return new ConcurrentLinkedQueue<StreamsDatum>();
     }
 
     @Override
@@ -149,4 +151,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
             lock.readLock().unlock();
         }
     }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index 5397757..cc71d48 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -139,7 +139,7 @@ public class TwitterFollowingProviderTask implements Runnable {
                         Preconditions.checkNotNull(follow);
 
                         if( count < provider.getConfig().getMaxItems()) {
-                            provider.addDatum(new StreamsDatum(follow));
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
                             count++;
                         }
 
@@ -157,7 +157,7 @@ public class TwitterFollowingProviderTask implements Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
             }
-        } while (curser != 0 && keepTrying < 10 && count < provider.getConfig().getMaxItems());
+        } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
     }
 
     private void collectIds(Long id) {
@@ -196,7 +196,7 @@ public class TwitterFollowingProviderTask implements Runnable {
                         Preconditions.checkNotNull(follow);
 
                         if( count < provider.getConfig().getMaxItems()) {
-                            provider.addDatum(new StreamsDatum(follow));
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
                             count++;
                         }
                     } catch (Exception e) {
@@ -213,7 +213,7 @@ public class TwitterFollowingProviderTask implements Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
             }
-        } while (curser != 0 && keepTrying < 10 && count < provider.getConfig().getMaxItems());
+        } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
     }
 
     protected void getFollowing(String screenName) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 26ba887..a8eada4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -109,7 +109,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkArgument(!ids.isEmpty());
 
-        LOGGER.info("readCurrent");
+        LOGGER.debug("{} - readCurrent", ids);
 
         submitTimelineThreads(ids.toArray(new Long[0]));
 
@@ -150,10 +150,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     public StreamsResultSet readCurrent() {
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
         StreamsResultSet result;
 
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
         try {
             lock.writeLock().lock();
             result = new StreamsResultSet(providerQueue);
@@ -176,7 +176,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     }
 
     protected Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+        return new LinkedBlockingQueue<StreamsDatum>();
     }
 
     public StreamsResultSet readNew(BigInteger sequence) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index adc37ca..b8d5e1d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,10 +18,17 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.*;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.*;
+import twitter4j.Status;
 
 import java.util.List;
 
@@ -32,6 +39,8 @@ public class TwitterTimelineProviderTask implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
+    private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
     protected TwitterTimelineProvider provider;
     protected Twitter client;
     protected Long id;
@@ -49,6 +58,8 @@ public class TwitterTimelineProviderTask implements Runnable {
         List<Status> statuses = null;
         int count = 0;
 
+        LOGGER.info(id + " Thread Starting");
+
         do
         {
             int keepTrying = 0;
@@ -67,10 +78,15 @@ public class TwitterTimelineProviderTask implements Runnable {
                     statuses = client.getUserTimeline(id, paging);
 
                     for (Status tStat : statuses) {
-                        String json = TwitterObjectFactory.getRawJSON(tStat);
 
+                        String json = TwitterObjectFactory.getRawJSON(tStat);
                         if( count < provider.getConfig().getMaxItems() ) {
-                            provider.addDatum(new StreamsDatum(json));
+                            try {
+                                org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
+                                ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
+                            } catch(Exception exception) {
+                                LOGGER.warn("Failed to read document as Tweet ", tStat);
+                            }
                             count++;
                         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index c4cc96d..78eb3e6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,17 +18,24 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -36,7 +43,6 @@ import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterFactory;
-import twitter4j.User;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 
@@ -48,17 +54,27 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 
 public class TwitterUserInformationProvider implements StreamsProvider, Serializable
 {
 
     public static final String STREAMS_ID = "TwitterUserInformationProvider";
 
+    private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
     private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
 
+    public static final int MAX_NUMBER_WAITING = 1000;
+
     private TwitterUserInformationConfiguration config;
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue;
 
     public TwitterUserInformationConfiguration getConfig()              { return config; }
 
@@ -81,7 +97,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     }
 
     public TwitterUserInformationProvider() {
-        this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
+        this.config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
     }
 
     public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) {
@@ -99,7 +115,20 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     @Override
     public void startStream() {
+
+        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+        LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
+
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
+
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
+
         running.set(true);
+
+        executor.shutdown();
     }
 
     protected void loadBatch(Long[] ids) {
@@ -116,9 +145,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
                 for(int i = 0; i < ids.length; i++)
                     toQuery[i] = ids[i];
 
-                for (User tStat : client.lookupUsers(toQuery)) {
-                    String json = DataObjectFactory.getRawJSON(tStat);
-                    ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
+                for (twitter4j.User tUser : client.lookupUsers(toQuery)) {
+                    String json = DataObjectFactory.getRawJSON(tUser);
+                    try {
+                        User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
+                    } catch(Exception exception) {
+                        LOGGER.warn("Failed to read document as User ", tUser);
+                    }
                 }
                 keepTrying = 10;
             }
@@ -141,9 +175,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         {
             try
             {
-                for (User tStat : client.lookupUsers(ids)) {
-                    String json = DataObjectFactory.getRawJSON(tStat);
-                    providerQueue.offer(new StreamsDatum(json));
+                for (twitter4j.User tUser : client.lookupUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(tUser);
+                    try {
+                        User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
+                    } catch(Exception exception) {
+                        LOGGER.warn("Failed to read document as User ", tUser);
+                    }
                 }
                 keepTrying = 10;
             }
@@ -158,30 +197,34 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     public StreamsResultSet readCurrent() {
 
-        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        while(idsBatches.hasNext())
-            loadBatch(idsBatches.next());
-
-        while(screenNameBatches.hasNext())
-            loadBatch(screenNameBatches.next());
-
+        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
 
-        LOGGER.info("Finished.  Cleaning up...");
+        StreamsResultSet result;
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        try {
+            lock.writeLock().lock();
+            result = new StreamsResultSet(providerQueue);
+            result.setCounter(new DatumStatusCounter());
+            providerQueue = constructQueue();
+            LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
+        if( providerQueue.isEmpty() && executor.isTerminated()) {
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
-        LOGGER.info("Exiting");
+            running.set(false);
+        }
 
         return result;
 
     }
 
+    protected Queue<StreamsDatum> constructQueue() {
+        return new LinkedBlockingQueue<StreamsDatum>();
+    }
+
     public StreamsResultSet readNew(BigInteger sequence) {
         LOGGER.debug("{} readNew", STREAMS_ID);
         throw new NotImplementedException();
@@ -225,6 +268,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         if( o instanceof TwitterFollowingConfiguration )
             config = (TwitterUserInformationConfiguration) o;
 
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
         Preconditions.checkNotNull(providerQueue);
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());
         Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
@@ -276,7 +326,10 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         if(screenNames.size() > 0)
             screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
 
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+        if(ids.size() + screenNames.size() > 0)
+            executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+        else
+            executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
 
         this.idsBatches = idsBatches.iterator();
         this.screenNameBatches = screenNameBatches.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
index b667540..320db12 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
@@ -6,6 +6,7 @@
     ],
     "id": "#",
     "javaType" : "org.apache.streams.twitter.pojo.Follow",
+    "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "follower": {
             "$ref": "User.json"

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
index 5d911af..69048d1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
@@ -72,6 +72,16 @@
                     "type": "string"
                 }
             }
+        },
+        "retrySleepMs": {
+             "type": "integer",
+             "description": "ms to sleep when hitting a rate limit",
+             "default": 100000
+         },
+         "retryMax": {
+             "type": "integer",
+             "description": "ms to sleep when hitting a rate limit",
+             "default": 10
         }
    }
 }
\ No newline at end of file


[3/8] incubator-streams git commit: employ args to simplify test and provider command line

Posted by sb...@apache.org.
employ args to simplify test and provider command line


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d9e58cdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d9e58cdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d9e58cdd

Branch: refs/heads/master
Commit: d9e58cdd67020520d592aad621b3aff6a8249537
Parents: 0813b11
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 20:25:44 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 20:25:44 2016 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            | 12 ++++
 .../provider/TwitterTimelineProvider.java       | 64 +++++++++++++++-----
 .../provider/TwitterTimelineProviderIT.java     | 35 +----------
 .../resources/TwitterTimelineProviderIT.conf    |  4 ++
 .../resources/TwitterTimelineProviderTest.conf  |  4 --
 5 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 903d3a7..7ec0908 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -209,6 +209,18 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-resources-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>1.5.0</version>
+                <executions>
+                  <execution>
+                    <goals>
+                      <goal>exec</goal>
+                    </goals>
+                  </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b8653b8..61cddaf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -24,8 +24,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -42,6 +46,11 @@ import org.slf4j.LoggerFactory;
 import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
@@ -55,8 +64,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  *  Retrieve recent posts from a list of user ids or names.
+ *
+ *  To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  twitter.oauth.consumerKey
+ *  twitter.oauth.consumerSecret
+ *  twitter.oauth.accessToken
+ *  twitter.oauth.accessTokenSecret
+ *  twitter.info
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     public final static String STREAMS_ID = "TwitterTimelineProvider";
 
@@ -64,30 +87,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
 
     private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
-    public static void main(String[] args) {
-        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
         TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
-        provider.run();
-    }
 
-    @Override
-    public void run() {
-        prepare(config);
-        startStream();
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
         do {
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-            Iterator<StreamsDatum> iterator = readCurrent().iterator();
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
             while(iterator.hasNext()) {
                 StreamsDatum datum = iterator.next();
                 String json;
                 try {
                     json = MAPPER.writeValueAsString(datum.getDocument());
-                    System.out.println(json);
+                    outStream.println(json);
                 } catch (JsonProcessingException e) {
                     System.err.println(e.getMessage());
                 }
             }
-        } while( isRunning());
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
     }
 
     public static final int MAX_NUMBER_WAITING = 10000;
@@ -189,7 +225,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
 
         StreamsResultSet result;
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        LOGGER.debug("Providing {} docs", providerQueue.size());
 
         try {
             lock.writeLock().lock();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
index e0f3b6a..f21a87e 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -43,26 +43,10 @@ public class TwitterTimelineProviderIT {
     @Test
     public void testTwitterTimelineProvider() throws Exception {
 
-        PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
-        PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+        String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
+        String outfile = "./target/test-classes/TwitterTimelineProviderIT.txt";
 
-        System.setOut(stdout);
-        System.setErr(stderr);
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
-        TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
-
-        TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
-        provider.run();
-
-        stdout.flush();
-        stderr.flush();
+        TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
 
         File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
         assert (out.exists());
@@ -76,18 +60,5 @@ public class TwitterTimelineProviderIT {
 
         assert (outCounter.getLineNumber() == 1000);
 
-        File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
-        assert (err.exists());
-        assert (err.canRead());
-        assert (err.isFile());
-
-        FileReader errReader = new FileReader(err);
-        LineNumberReader errCounter = new LineNumberReader(errReader);
-
-        while(errCounter.readLine() != null) {}
-
-        assert (errCounter.getLineNumber() == 0);
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+  18055613
+]
+twitter.max_items = 1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
deleted file mode 100644
index a7862c4..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
+++ /dev/null
@@ -1,4 +0,0 @@
-twitter.info = [
-  18055613
-]
-twitter.max_items = 1000
\ No newline at end of file


[2/8] incubator-streams git commit: example of STREAMS-415 using twitter

Posted by sb...@apache.org.
example of STREAMS-415 using twitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0813b11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0813b11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0813b11e

Branch: refs/heads/master
Commit: 0813b11edd535322cbabafd9a91e77136812e8bb
Parents: 9bf8ef9
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 17:37:06 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 17:37:24 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterTimelineProvider.java       | 40 ++++++++-
 .../src/site/markdown/index.md                  | 18 ++++
 .../provider/TwitterTimelineProviderIT.java     | 93 ++++++++++++++++++++
 .../provider/TwitterTimelineProviderTest.java   | 39 --------
 .../resources/TwitterTimelineProviderTest.conf  |  4 +
 5 files changed, 154 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a8eada4..b8653b8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,15 +18,23 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -48,12 +56,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  *  Retrieve recent posts from a list of user ids or names.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
 
     public final static String STREAMS_ID = "TwitterTimelineProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
+    private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+    public static void main(String[] args) {
+        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+        TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+        provider.run();
+    }
+
+    @Override
+    public void run() {
+        prepare(config);
+        startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            Iterator<StreamsDatum> iterator = readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    System.out.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( isRunning());
+    }
+
     public static final int MAX_NUMBER_WAITING = 10000;
 
     private TwitterUserInformationConfiguration config;
@@ -116,6 +152,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         running.set(true);
 
         executor.shutdown();
+
     }
 
     public boolean shouldContinuePulling(List<Status> statuses) {
@@ -304,4 +341,5 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             lock.readLock().unlock();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
index ec5d1c8..4249956 100644
--- a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
@@ -31,6 +31,24 @@ streams-provider-twitter contains schema definitions, providers, conversions, an
 | TwitterStreamProvider [TwitterStreamProvider.html](apidocs/org/apache/streams/twitter/TwitterStreamProvider.html "javadoc") | [TwitterStreamConfiguration.json](com/twitter/TwitterStreamConfiguration.json "TwitterStreamConfiguration.json") [TwitterUserInformationConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterStreamConfiguration.html "javadoc") | [sample.conf](sample.conf "sample.conf")<br/>[userstream.conf](userstream.conf "userstream.conf") |
 | TwitterFollowingProvider [TwitterFollowingProvider.html](apidocs/org/apache/streams/twitter/TwitterFollowingConfiguration.html "javadoc") | [TwitterFollowingConfiguration.json](com/twitter/TwitterFollowingConfiguration.json "TwitterFollowingConfiguration.json") [TwitterFollowingConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterFollowingConfiguration.html "javadoc") | [friends.conf](friends.conf "friends.conf")<br/>[followers.conf](followers.conf "followers.conf") |
 
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+    twitter {
+      oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+      }
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..e0f3b6a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TwitterTimelineProviderIT {
+
+    @Test
+    public void testTwitterTimelineProvider() throws Exception {
+
+        PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
+        PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+
+        System.setOut(stdout);
+        System.setErr(stderr);
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+        StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
+
+        TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
+        provider.run();
+
+        stdout.flush();
+        stderr.flush();
+
+        File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() == 1000);
+
+        File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
+        assert (err.exists());
+        assert (err.canRead());
+        assert (err.isFile());
+
+        FileReader errReader = new FileReader(err);
+        LineNumberReader errCounter = new LineNumberReader(errReader);
+
+        while(errCounter.readLine() != null) {}
+
+        assert (errCounter.getLineNumber() == 0);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
deleted file mode 100644
index 0cdede0..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderTest {
-
-    @Test
-    public void consolidateToIDsTest() {
-        List<String> ids = Arrays.asList("2342342", "", "144523", null);
-
-        TwitterUserInformationConfiguration twitterUserInformationConfiguration = new TwitterUserInformationConfiguration();
-        twitterUserInformationConfiguration.setInfo(ids);
-        TwitterTimelineProvider twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration);
-
-        twitterTimelineProvider.consolidateToIDs();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+  18055613
+]
+twitter.max_items = 1000
\ No newline at end of file


[7/8] incubator-streams git commit: add 3 more provider ITs, reorganize test packages

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
new file mode 100644
index 0000000..698a2c8
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
@@ -0,0 +1,1002 @@
+twitter.info = [
+  3424266646
+  3277467241
+  3244517214
+  29953647
+  63818319
+  1528436754
+  405580894
+  322778026
+  172382176
+  633076833
+  703735608
+  2347223440
+  2907929487
+  950240089
+  1418546592
+  3318418717
+  2848958704
+  1120797264
+  933623324
+  2977700375
+  328204518
+  585131136
+  2868789793
+  158347647
+  2915413161
+  2217367263
+  2534019247
+  3033565239
+  377379801
+  2525341814
+  3123827524
+  1840932523
+  3307643975
+  3301777832
+  961987748
+  3205632255
+  2799469322
+  17730681
+  1495242662
+  1909516123
+  263933760
+  312651511
+  2479527469
+  2357151036
+  346433828
+  44801893
+  1049697306
+  2779673194
+  18323141
+  2172488902
+  2373431930
+  1038322550
+  2946211549
+  2911057543
+  1186036284
+  2878076317
+  1312950464
+  57323685
+  32929857
+  301933631
+  2852217152
+  330422649
+  98470876
+  933125156
+  3237125761
+  914882005
+  1560239652
+  900444860
+  402918702
+  1820690166
+  3074359086
+  353183684
+  528544881
+  1881638161
+  2751762993
+  3161315692
+  3305680079
+  1721613488
+  513068659
+  627186234
+  3203648416
+  1541163325
+  1882043502
+  29071727
+  610104090
+  2819781014
+  2909115204
+  213886397
+  3249385591
+  3086875073
+  87040031
+  2202487475
+  334896132
+  49163181
+  3433984816
+  543969362
+  489445461
+  855051894
+  2792040175
+  117051455
+  438599410
+  1387329846
+  711595782
+  3230662766
+  2766672269
+  2926781875
+  863203928
+  517199566
+  201645935
+  1555939147
+  2943152669
+  1324775431
+  400234897
+  2347416842
+  1558112510
+  474415350
+  2153710970
+  1408335014
+  3633713483
+  3166021013
+  3530993294
+  332598229
+  308252069
+  3317826986
+  572175644
+  1718271572
+  2869090090
+  23725109
+  1926137280
+  1486830500
+  743080386
+  3250479720
+  2560441544
+  2715649872
+  287089153
+  18761334
+  2305577745
+  724860668
+  193306049
+  2615761979
+  2463299598
+  1436916012
+  919019185
+  90502449
+  50689522
+  1383774679
+  612784850
+  410319975
+  833440153
+  442322844
+  2181167094
+  94012832
+  112748352
+  1474618075
+  158262669
+  2391506308
+  882502026
+  2693660146
+  2971933908
+  55271184
+  2287356556
+  2895756090
+  407147132
+  3262181
+  313317193
+  2729137002
+  2939122360
+  2751601568
+  1215082350
+  124866576
+  274292311
+  3310301042
+  95407473
+  24993769
+  1342908648
+  1805339413
+  3118252036
+  893269387
+  1481149014
+  463288019
+  75008083
+  2895489727
+  965493739
+  278637248
+  1937513246
+  422218268
+  3320995462
+  78682286
+  2777069098
+  2909553730
+  2914338670
+  1251667531
+  2764034755
+  532659717
+  269002510
+  29373713
+  358075450
+  633880614
+  200374379
+  141628294
+  1513028977
+  116798791
+  2937455354
+  246194623
+  793925970
+  115594167
+  82463176
+  324774974
+  185844856
+  2462295999
+  3555105016
+  1029169117
+  2689309484
+  1587145976
+  1607241271
+  3032276402
+  183916933
+  63766245
+  151217255
+  2781098109
+  252081559
+  1608788256
+  41984573
+  1896587353
+  40136999
+  295505814
+  384867933
+  116947371
+  255703939
+  2687800732
+  76543916
+  881649782
+  2765729924
+  1715695669
+  1965383022
+  2888214228
+  21820514
+  1727966414
+  2581992818
+  103999565
+  741018846
+  446792386
+  2568989424
+  2780674777
+  465934916
+  3378294885
+  2885604327
+  3336273419
+  130742941
+  2327629099
+  1103818104
+  3050036073
+  2882456842
+  2702914248
+  2153674818
+  132825659
+  289758699
+  2995946100
+  3027449217
+  2708029160
+  1529367002
+  608170333
+  140446819
+  2790688993
+  1597308192
+  14462028
+  104062608
+  370274893
+  356145607
+  566542629
+  112587243
+  39372070
+  146853060
+  2440984657
+  3074554539
+  204701034
+  887623447
+  1971521630
+  2457208175
+  466113358
+  1574643830
+  1465533884
+  2500404589
+  1633154150
+  1349117870
+  1658071267
+  593022891
+  3094177813
+  1304672510
+  3385525697
+  2916225552
+  2759773715
+  1369215552
+  1058390078
+  2532850321
+  351483656
+  1902796704
+  113000738
+  2241245557
+  2416606754
+  408729540
+  2530294556
+  2936808249
+  3138999692
+  2679987883
+  1448537377
+  2524773906
+  942079406
+  2217584389
+  3059427504
+  3028507725
+  632766658
+  3302663431
+  2914832897
+  93487101
+  2786054379
+  1339647769
+  531402307
+  402066474
+  337936675
+  2760568625
+  1385916396
+  2595560922
+  421910477
+  1713100813
+  352016040
+  415247994
+  1883606209
+  2974994111
+  1118022211
+  3096979637
+  711889867
+  262890561
+  233810062
+  1877177168
+  964106670
+  164985413
+  2920420361
+  318936782
+  3289826764
+  145873735
+  2523059919
+  2409896179
+  2292047201
+  285674825
+  2765549780
+  2359541905
+  2419103894
+  358884588
+  206231205
+  136500778
+  1397885138
+  2625422097
+  2524578002
+  604278657
+  2625634867
+  73168019
+  407448958
+  189276174
+  2507896925
+  80880449
+  520177827
+  418469102
+  2925075456
+  615730636
+  2995998941
+  2697270934
+  497135011
+  2944598402
+  428706893
+  1345291712
+  388751708
+  130092079
+  2984741882
+  1047514436
+  15927135
+  2884357840
+  294362779
+  2870985800
+  1720400449
+  130027314
+  2970518577
+  240923858
+  1613498838
+  708321211
+  1403382426
+  2602186970
+  1596855998
+  280062526
+  2716454552
+  268720451
+  2869044811
+  1911762488
+  392373280
+  2151082712
+  2770919004
+  231541900
+  60122778
+  390006102
+  240167506
+  1558314660
+  221608257
+  852829933
+  461669243
+  239778483
+  502146157
+  1471963970
+  276426707
+  2336546150
+  323595235
+  128670043
+  1308641714
+  1411112756
+  3011727217
+  3082006921
+  450537474
+  2673101407
+  2416030447
+  51952627
+  708057486
+  833620748
+  3024957797
+  2147572362
+  1712467098
+  2899300501
+  1348351772
+  2923114629
+  2779232814
+  21306308
+  1466314507
+  1224588289
+  81307783
+  42717316
+  315972617
+  434649827
+  105839296
+  366063496
+  34045892
+  3076447389
+  92437198
+  3124335006
+  1444393410
+  351737762
+  1919360383
+  2836048345
+  1670939112
+  722140159
+  92939425
+  2932728756
+  2831872033
+  1354255123
+  1689738186
+  463578260
+  2881582438
+  912252510
+  3226221887
+  390827200
+  269169237
+  1450007192
+  2735984326
+  3029836305
+  28291382
+  785668627
+  567287970
+  1480004420
+  131927864
+  2958631308
+  488490020
+  2603422688
+  3186614985
+  177373618
+  2466506329
+  2651294251
+  3367170684
+  2673870882
+  369098635
+  242011326
+  18099277
+  1922210574
+  3093762445
+  470634878
+  1674607392
+  2920526283
+  3261677580
+  2192187078
+  485599960
+  1854850729
+  95198467
+  2228217740
+  2171528344
+  2957461230
+  226615737
+  1624183567
+  158597677
+  2909224690
+  19278114
+  2488284258
+  2777071149
+  1598064697
+  2740691127
+  3100908480
+  1147010126
+  2741161553
+  439971668
+  3247227273
+  2884261062
+  3127250575
+  2942021278
+  539428196
+  409599986
+  3161801331
+  2328613860
+  1903013437
+  313082004
+  2580495721
+  209464435
+  600172085
+  339541217
+  62219810
+  583287316
+  295891933
+  561683767
+  229192352
+  1357869918
+  235438136
+  1599249169
+  583879210
+  507744802
+  1696336261
+  2323537206
+  36882220
+  541528426
+  956202559
+  387936537
+  211658842
+  2685186010
+  2581656488
+  391154378
+  122932105
+  409764153
+  129737967
+  2848806360
+  3054860719
+  372199585
+  2316121597
+  703345746
+  3335505287
+  2466151422
+  380038166
+  420561214
+  2977085351
+  110955327
+  3004295886
+  2362857361
+  3053844460
+  3182081552
+  324208260
+  2571790321
+  1061498868
+  2187395299
+  2187482779
+  3096652530
+  2538239672
+  3809634552
+  2306848839
+  1544061547
+  151075965
+  3250238556
+  16157689
+  1692663644
+  1356000732
+  436774994
+  45503055
+  1086037316
+  2798297775
+  2923485772
+  58731726
+  211816170
+  885013716
+  2608529078
+  2954917057
+  2271021600
+  173743066
+  451543575
+  3219728436
+  399824828
+  2464688153
+  2541069631
+  1522892262
+  3167829845
+  944851321
+  2471474509
+  68073858
+  1496221376
+  13979882
+  2218792189
+  302123873
+  2845915546
+  431402814
+  1364254945
+  2711277666
+  2766696876
+  2495441323
+  2844317433
+  138009079
+  2578631100
+  478167529
+  1222728360
+  1323688411
+  2883066187
+  2443554697
+  411631689
+  68537682
+  1027019269
+  1660752493
+  987324488
+  2764106926
+  2184511674
+  103419315
+  2310456424
+  1572938088
+  2554895281
+  34138105
+  2942100621
+  160517898
+  285075974
+  2260805169
+  19390498
+  301696842
+  2588239985
+  2886588596
+  2962622367
+  1867897483
+  2827053488
+  1447767319
+  2924491293
+  167327096
+  3309592402
+  2795575638
+  578758971
+  2888665561
+  30542348
+  1437049609
+  2242541566
+  74354017
+  58900854
+  2159055031
+  246517688
+  2916873012
+  1110055280
+  562430843
+  761797794
+  1648208552
+  301483343
+  2896842048
+  522103295
+  1578517986
+  2659610776
+  2890560429
+  1427665578
+  268363160
+  563709041
+  2172300002
+  2791262431
+  3039809351
+  2914940301
+  2746560353
+  2892191616
+  71596845
+  233770184
+  1530949130
+  105906110
+  755347622
+  490836906
+  357603454
+  324517203
+  2835402315
+  3285479894
+  86368327
+  238219970
+  3153173945
+  2732361234
+  2357626327
+  346602505
+  13732632
+  44055265
+  2998032219
+  482072312
+  1721073866
+  1386781034
+  168194206
+  1213443144
+  181296114
+  942598400
+  2955577216
+  582056669
+  747540468
+  2371722140
+  360824004
+  3023711736
+  207032580
+  2748107976
+  464428175
+  3150849096
+  85450014
+  2840066340
+  2287819200
+  240931426
+  553606800
+  397876544
+  2195298230
+  2601812005
+  3013344739
+  17599363
+  1572639314
+  3377673407
+  303420278
+  2811879995
+  526860891
+  346333874
+  113568311
+  705488304
+  3238867619
+  333772149
+  373309716
+  300472003
+  3223424681
+  2895699896
+  3241119570
+  1147453440
+  3135402609
+  521763744
+  2702966971
+  2878317616
+  845031697
+  2855454471
+  3051902539
+  482306439
+  129173738
+  306572138
+  2941951538
+  762707233
+  2732608168
+  1228456939
+  246020724
+  1920607602
+  14434245
+  1254943537
+  1520746602
+  150745124
+  1350160351
+  38707222
+  267766858
+  2992121760
+  712666764
+  983036864
+  289490939
+  269797384
+  100215048
+  3099557245
+  2339741570
+  306005146
+  1182227460
+  288235870
+  1412832260
+  455190443
+  489912183
+  448994061
+  2944595072
+  2453094914
+  2899434206
+  59288818
+  2824706688
+  423363992
+  972850482
+  997868714
+  1203750733
+  176147179
+  115110596
+  2978397615
+  2528946267
+  620180433
+  365949935
+  110609853
+  1533494268
+  2723839166
+  34186887
+  2864430424
+  76942977
+  361086733
+  2724200587
+  635206139
+  2757801421
+  19651443
+  3364322949
+  2770576744
+  2168612560
+  764020297
+  2558268513
+  2855384901
+  1881414907
+  2502212139
+  3250037586
+  2525185944
+  591375982
+  707911211
+  3025041666
+  19785599
+  2311172950
+  922817815
+  739363530
+  2812894393
+  2496283986
+  206162815
+  590916342
+  354053245
+  2735195854
+  2788759128
+  3510947235
+  3490740532
+  2920847304
+  2681444558
+  2856805755
+  3103899682
+  145893832
+  3065663910
+  2736009516
+  2835226230
+  1590913771
+  2700889555
+  2221272164
+  109780161
+  700221218
+  541753453
+  126575915
+  274336817
+  2498172455
+  2809515630
+  2588774684
+  296734891
+  2212410182
+  243027454
+  1336526904
+  397062736
+  449331876
+  30619307
+  2310483811
+  2437586509
+  191710730
+  1084185378
+  2831486681
+  1606477879
+  969600636
+  529783214
+  2928131586
+  190041293
+  2967031274
+  2165962781
+  376501355
+  284137985
+  266863824
+  407944074
+  108456036
+  1641294422
+  900733706
+  1063071450
+  1682722328
+  341419520
+  1644293778
+  2245151467
+  511176989
+  241922669
+  3388315624
+  1909431145
+  2223820028
+  600581315
+  1723555076
+  2748445313
+  561211823
+  561022931
+  2751429993
+  2714908343
+  16165257
+  524623359
+  306741266
+  469994381
+  2561892084
+  998802661
+  1492924374
+  789039140
+  210150093
+  817544820
+  35740178
+  326162841
+  1447331628
+  17493441
+  2874693608
+  965027312
+  261936985
+  510564259
+  728031187
+  164696234
+  2204519310
+  1626241164
+  1024940588
+  221486613
+  571084565
+  3029264508
+  221716563
+  2211417135
+  499972359
+  1565989165
+  2436927208
+  381029291
+  2730580620
+  3436438413
+  2466014604
+  538990742
+  2935470687
+  1162845468
+  468108082
+  2383897542
+  2542119658
+  1962281514
+  171235080
+  536915535100125185
+  2841076618
+  3006098500
+  1057158554
+  3245676721
+  251087536
+  3082811549
+  281785349
+  1674871100
+  1898659951
+  1414854156
+  428693618
+  2385953101
+  2281213477
+  2786368894
+  2253203998
+  357277727
+  1358707970
+  545186198
+  3033613587
+  107121821
+  595965259
+  583894637
+  1306698787
+  442262869
+  2868353318
+  1908436844
+  271982042
+  495202171
+  251586884
+  3151032974
+  2213682568
+  1203133039
+  193128957
+  597407120
+  2781102086
+  369254505
+  62831036
+  2328734640
+  2579064082
+  3271313827
+  2880366619
+  2323026113
+  446380518
+  245418139
+  261211664
+  1893329208
+  3406596309
+  584967077
+  1708862304
+  388961426
+  2421535351
+  2194375668
+  2790313673
+  2728894977
+  2829174824
+  784541196
+  959902393
+  249705367
+  1677679309
+  2825975175
+  1305768366
+  373475046
+  785362464
+  419607671
+  61031675
+  3854236343
+  714603248
+  1301447720
+  827660912
+  2383764684
+  3180084906
+  3265558124
+  608536922
+  238943561
+]
\ No newline at end of file


[6/8] incubator-streams git commit: more main methods: STREAMS-411, better thread tracking: STREAMS-425, misc cleanup

Posted by sb...@apache.org.
more main methods: STREAMS-411, better thread tracking: STREAMS-425, misc cleanup

more main methods: STREAMS-411
better thread tracking: STREAMS-425
misc cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/170cb8b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/170cb8b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/170cb8b6

Branch: refs/heads/master
Commit: 170cb8b6b9d647dc2b7ff82b87edf060f078585c
Parents: f1540b1
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Oct 6 14:01:04 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Oct 6 14:01:04 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowingProvider.java      | 120 +++++++---
 .../twitter/provider/TwitterStreamProvider.java |  55 +++++
 .../provider/TwitterTimelineProvider.java       | 191 ++++++++--------
 .../TwitterUserInformationProvider.java         | 227 ++++++++++++-------
 .../twitter/TwitterFollowingConfiguration.json  |   2 +-
 5 files changed, 386 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4c3a828..66c1104 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -18,22 +18,43 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -49,6 +70,49 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
     private TwitterFollowingConfiguration config;
 
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
+        TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+
+        ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     public TwitterFollowingConfiguration getConfig()              { return config; }
 
     public static final int MAX_NUMBER_WAITING = 10000;
@@ -63,14 +127,24 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     }
 
     @Override
+    public void prepare(Object o) {
+        super.prepare(config);
+        Preconditions.checkNotNull(getConfig().getEndpoint());
+        Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+        return;
+    }
+
+    @Override
     public void startStream() {
 
-        running.set(true);
+        Preconditions.checkNotNull(executor);
 
         Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
         LOGGER.info("startStream");
 
+        running.set(true);
+
         while (idsBatches.hasNext()) {
             submitFollowingThreads(idsBatches.next());
         }
@@ -78,8 +152,6 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
             submitFollowingThreads(screenNameBatches.next());
         }
 
-        running.set(true);
-
         executor.shutdown();
 
     }
@@ -89,7 +161,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
         for (int i = 0; i < ids.length; i++) {
             TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
-            executor.submit(providerTask);
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", ids[i]);
         }
     }
 
@@ -98,7 +172,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
         for (int i = 0; i < screenNames.length; i++) {
             TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
-            executor.submit(providerTask);
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", screenNames[i]);
         }
 
     }
@@ -120,41 +196,17 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
             lock.writeLock().unlock();
         }
 
-        if (providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
-            running.set(false);
-
-            LOGGER.info("Exiting");
-        }
-
         return result;
 
     }
 
-    protected Queue<StreamsDatum> constructQueue() {
-        return new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    @Override
-    public void prepare(Object o) {
-        super.prepare(config);
-        Preconditions.checkNotNull(getConfig().getEndpoint());
-        Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
-        return;
-    }
-
-    public void addDatum(StreamsDatum datum) {
-        try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
     @Override
     public boolean isRunning() {
+        if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            running.set(false);
+            LOGGER.info("Exiting");
+        }
         return running.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index f584950..b414074 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,9 +18,12 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
 import com.twitter.hbc.core.Hosts;
@@ -35,7 +38,11 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.BasicAuth;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.DatumStatusCountable;
@@ -43,14 +50,21 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -72,6 +86,47 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
+        TwitterStreamProvider provider = new TwitterStreamProvider(config);
+
+        ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     public static final int MAX_BATCH = 1000;
 
     private TwitterStreamConfiguration config;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2924623..cea9829 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -68,6 +72,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
 /**
  *  Retrieve recent posts from a list of user ids or names.
  *
@@ -91,7 +97,39 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
-    private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+    public static final int MAX_NUMBER_WAITING = 10000;
+
+    private TwitterUserInformationConfiguration config;
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public TwitterUserInformationConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(TwitterUserInformationConfiguration config) {
+        this.config = config;
+    }
+
+    protected Collection<String[]> screenNameBatches;
+    protected Collection<Long> ids;
+
+    protected volatile Queue<StreamsDatum> providerQueue;
+
+    protected int idsCount;
+    protected Twitter client;
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    Boolean jsonStoreEnabled;
+    Boolean includeEntitiesEnabled;
 
     public static void main(String[] args) throws Exception {
 
@@ -111,6 +149,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
         TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
 
+        ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
         PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
         provider.prepare(config);
         provider.startStream();
@@ -121,7 +161,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 StreamsDatum datum = iterator.next();
                 String json;
                 try {
-                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    json = mapper.writeValueAsString(datum.getDocument());
                     outStream.println(json);
                 } catch (JsonProcessingException e) {
                     System.err.println(e.getMessage());
@@ -132,42 +172,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         outStream.flush();
     }
 
-    public static final int MAX_NUMBER_WAITING = 10000;
-
-    private TwitterUserInformationConfiguration config;
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public TwitterUserInformationConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(TwitterUserInformationConfiguration config) {
-        this.config = config;
-    }
-
-    protected Collection<String[]> screenNameBatches;
-    protected Collection<Long> ids;
-
-    protected volatile Queue<StreamsDatum> providerQueue;
-
-    protected int idsCount;
-    protected Twitter client;
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    Boolean jsonStoreEnabled;
-    Boolean includeEntitiesEnabled;
-
-    private static ExecutorService getExecutor() {
-        return Executors.newSingleThreadExecutor();
-    }
-
     public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
         this.config = config;
     }
@@ -182,17 +186,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     }
 
     @Override
+    public void prepare(Object o) {
+
+
+
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+        Preconditions.checkNotNull(config.getInfo());
+
+        consolidateToIDs();
+
+        if(ids.size() > 1)
+            executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
+        else
+            executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+    }
+
+    @Override
     public void startStream() {
+
         LOGGER.debug("{} startStream", STREAMS_ID);
 
         Preconditions.checkArgument(!ids.isEmpty());
 
-        LOGGER.debug("{} - readCurrent", ids);
+        running.set(true);
 
         submitTimelineThreads(ids.toArray(new Long[0]));
 
-        running.set(true);
-
         executor.shutdown();
 
     }
@@ -202,13 +232,15 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     }
 
     protected void submitTimelineThreads(Long[] ids) {
+
         Twitter client = getTwitterClient();
 
         for(int i = 0; i < ids.length; i++) {
 
             TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
-            executor.submit(providerTask);
-
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", ids[i]);
         }
 
     }
@@ -242,7 +274,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             lock.writeLock().unlock();
         }
 
-        if( providerQueue.isEmpty() && executor.isTerminated()) {
+        if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
             LOGGER.info("Finished.  Cleaning up...");
 
             running.set(false);
@@ -268,50 +300,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         throw new NotImplementedException();
     }
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
 
-        executor = getExecutor();
-
-        try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getInfo());
-
-        consolidateToIDs();
-    }
 
     /**
      * Using the "info" list that is contained in the configuration, ensure that all
@@ -375,13 +364,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         shutdownAndAwaitTermination(executor);
     }
 
-    public void addDatum(StreamsDatum datum) {
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
         try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
         }
     }
 
+    @Override
+    public boolean isRunning() {
+        if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            running.set(false);
+            LOGGER.info("Exiting");
+        }
+        return running.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 44f8a24..d6e783b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,13 +18,20 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -45,6 +52,10 @@ import twitter4j.TwitterFactory;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -75,6 +86,45 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     private TwitterUserInformationConfiguration config;
 
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+        TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config);
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     protected volatile Queue<StreamsDatum> providerQueue;
@@ -93,7 +143,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     protected final AtomicBoolean running = new AtomicBoolean();
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+    public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
@@ -117,8 +167,88 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     }
 
     @Override
+    public void prepare(Object o) {
+
+        if( o instanceof TwitterFollowingConfiguration )
+            config = (TwitterUserInformationConfiguration) o;
+
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getOauth());
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+        Preconditions.checkNotNull(config.getInfo());
+
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        Preconditions.checkNotNull(providerQueue);
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : config.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+        if(ids.size() + screenNames.size() > 0)
+            executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+        else
+            executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+
+        Preconditions.checkNotNull(executor);
+
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
+    }
+
+    @Override
     public void startStream() {
 
+        Preconditions.checkNotNull(executor);
+
         Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
         LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
@@ -214,16 +344,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             lock.writeLock().unlock();
         }
 
-        if( providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
-            running.set(false);
-
-            LOGGER.info("Exiting");
-        }
-
-        return result;
-
     }
 
     protected Queue<StreamsDatum> constructQueue() {
@@ -246,6 +366,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     @Override
     public boolean isRunning() {
+
+        if( providerQueue.isEmpty() && executor.isTerminated() ) {
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
+
+            running.set(false);
+
+            LOGGER.info("Exiting");
+        }
+
         return running.get();
     }
 
@@ -267,78 +396,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         }
     }
 
-    @Override
-    public void prepare(Object o) {
 
-        if( o instanceof TwitterFollowingConfiguration )
-            config = (TwitterUserInformationConfiguration) o;
-
-        try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getInfo());
-
-        List<String> screenNames = new ArrayList<String>();
-        List<String[]> screenNameBatches = new ArrayList<String[]>();
-
-        List<Long> ids = new ArrayList<Long>();
-        List<Long[]> idsBatches = new ArrayList<Long[]>();
-
-        for(String s : config.getInfo()) {
-            if(s != null)
-            {
-                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
-
-                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
-                // screen name list
-                try {
-                    ids.add(Long.parseLong(potentialScreenName));
-                } catch (Exception e) {
-                    screenNames.add(potentialScreenName);
-                }
-
-                // Twitter allows for batches up to 100 per request, but you cannot mix types
-
-                if(ids.size() >= 100) {
-                    // add the batch
-                    idsBatches.add(ids.toArray(new Long[ids.size()]));
-                    // reset the Ids
-                    ids = new ArrayList<Long>();
-                }
-
-                if(screenNames.size() >= 100) {
-                    // add the batch
-                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-                    // reset the Ids
-                    screenNames = new ArrayList<String>();
-                }
-            }
-        }
-
-
-        if(ids.size() > 0)
-            idsBatches.add(ids.toArray(new Long[ids.size()]));
-
-        if(screenNames.size() > 0)
-            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-
-        if(ids.size() + screenNames.size() > 0)
-            executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
-        else
-            executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
-
-        this.idsBatches = idsBatches.iterator();
-        this.screenNameBatches = screenNameBatches.iterator();
-    }
 
     protected Twitter getTwitterClient()
     {
@@ -359,6 +417,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         return new TwitterFactory(builder.build()).getInstance();
     }
 
+    protected void callback() {
+
+
+    }
+
     @Override
     public void cleanUp() {
         shutdownAndAwaitTermination(executor);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
index c72f3cf..dda5d1b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
@@ -12,7 +12,7 @@
         "ids_only": {
             "type": "boolean",
             "description": "Whether to collect ids only, or full profiles",
-            "value": "true"
+            "default": "true"
         }
     }
 }
\ No newline at end of file



[8/8] incubator-streams git commit: add 3 more provider ITs, reorganize test packages

Posted by sb...@apache.org.
add 3 more provider ITs, reorganize test packages


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4febde27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4febde27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4febde27

Branch: refs/heads/master
Commit: 4febde277d428fe0a3fcd9de55b7eaa3899cf4d0
Parents: 170cb8b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Oct 6 17:56:40 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Oct 6 17:56:40 2016 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterStreamProvider.java |   11 +-
 .../TwitterUserInformationProvider.java         |    2 +
 .../provider/TwitterTimelineProviderIT.java     |   64 --
 .../test/TwitterActivityConvertersTest.java     |   86 --
 .../TwitterActivityObjectsConvertersTest.java   |   55 -
 .../test/TwitterDocumentClassifierTest.java     |   88 --
 .../twitter/test/TwitterObjectMapperIT.java     |  132 ---
 .../test/data/TwitterObjectMapperIT.java        |  132 +++
 .../providers/TwitterFollowingProviderIT.java   |   52 +
 .../test/providers/TwitterStreamProviderIT.java |   64 ++
 .../providers/TwitterTimelineProviderIT.java    |   52 +
 .../TwitterUserInformationProviderIT.java       |   52 +
 .../utils/TwitterActivityConvertersTest.java    |   86 ++
 .../TwitterActivityObjectsConvertersTest.java   |   47 +
 .../utils/TwitterDocumentClassifierTest.java    |   88 ++
 .../resources/TwitterFollowingProviderIT.conf   |    8 +
 .../test/resources/TwitterStreamProviderIT.conf |    6 +
 .../TwitterUserInformationProviderIT.conf       | 1002 ++++++++++++++++++
 18 files changed, 1600 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index b414074..eac1218 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
@@ -86,7 +87,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
 
         Preconditions.checkArgument(args.length >= 2);
 
@@ -106,7 +107,13 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
         ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        PrintStream outStream = null;
+        try {
+            outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        } catch (FileNotFoundException e) {
+            LOGGER.error("FileNotFoundException", e);
+            return;
+        }
         provider.prepare(config);
         provider.startStream();
         do {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index d6e783b..15ff791 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -344,6 +344,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             lock.writeLock().unlock();
         }
 
+        return result;
+
     }
 
     protected Queue<StreamsDatum> constructQueue() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
deleted file mode 100644
index f21a87e..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.LineNumberReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderIT {
-
-    @Test
-    public void testTwitterTimelineProvider() throws Exception {
-
-        String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
-        String outfile = "./target/test-classes/TwitterTimelineProviderIT.txt";
-
-        TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
-
-        File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
-
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
-
-        while(outCounter.readLine() != null) {}
-
-        assert (outCounter.getLineNumber() == 1000);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
deleted file mode 100644
index 5e0473b..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterUtil;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.converter.*}
- */
-public class TwitterActivityConvertersTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
-    private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
-
-    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
 ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
-    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
 /profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
 e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
 \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
 ":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
-    private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
-    private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
-
-    @Test
-    public void testConvertTweet() {
-        List<Activity> activityList = activityConverterUtil.convert(tweet);
-        Assert.assertTrue(activityList.size() == 1);
-        Activity activity = activityList.get(0);
-        if( !ActivityUtil.isValid(activity) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testConvertRetweet() {
-        List<Activity> activityList = activityConverterUtil.convert(retweet);
-        Assert.assertTrue(activityList.size() == 1);
-        Activity activity = activityList.get(0);
-        if( !ActivityUtil.isValid(activity) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testConvertDelete() {
-        List<Activity> activityList = activityConverterUtil.convert(delete);
-        Assert.assertTrue(activityList.size() == 1);
-        Activity activity = activityList.get(0);
-        if( !ActivityUtil.isValid(activity) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testConvertFollow() {
-        List<Activity> activityList = activityConverterUtil.convert(follow);
-        Assert.assertTrue(activityList.size() == 1);
-        Activity activity = activityList.get(0);
-        if( !ActivityUtil.isValid(activity) )
-            Assert.fail();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
deleted file mode 100644
index 4a663e2..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterUtil;
-import org.apache.streams.converter.ActivityObjectConverterUtil;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.converter.*}
- */
-public class TwitterActivityObjectsConvertersTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class);
-
-    private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance();
-
-    private String user = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_
 url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\
 "urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/ai\u2026\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n";
-
-    @Test
-    public void testConvertUser() {
-        ActivityObject activityObject = activityObjectConverterUtil.convert(user);
-        assert( activityObject != null );
-        if( !ActivityUtil.isValid(activityObject) )
-            Assert.fail();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
deleted file mode 100644
index 044fe3c..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.processor.TwitterEventClassifier}
- */
-public class TwitterDocumentClassifierTest {
-
-    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
 ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
-    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
 /profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
 e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
 \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
 ":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
-    private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
-    private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
-    private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl
 y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
-
-    @Test
-    public void testDetectTweet() {
-        List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Tweet.class) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testDetectRetweet() {
-        List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Retweet.class) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testDetectDelete() {
-        List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Delete.class) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testDetectFollow() {
-        List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Follow.class) )
-            Assert.fail();
-    }
-
-    @Test
-    public void testDetectUser() {
-        List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if (!result.equals(User.class))
-            Assert.fail();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
deleted file mode 100644
index e8bbf49..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
-
-/**
-* Tests serialization / deserialization of twitter jsons
-*/
-public class TwitterObjectMapperIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        int tweetlinks = 0;
-        int retweetlinks = 0;
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if(!StringUtils.isEmpty(line))
-                {
-                    LOGGER.info("raw: {}", line);
-
-                    Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
-
-                    ObjectNode event = (ObjectNode) mapper.readTree(line);
-
-                    assertThat(event, is(not(nullValue())));
-
-                    if( detected == Tweet.class ) {
-
-                        Tweet tweet = mapper.convertValue(event, Tweet.class);
-
-                        assertThat(tweet, is(not(nullValue())));
-                        assertThat(tweet.getCreatedAt(), is(not(nullValue())));
-                        assertThat(tweet.getText(), is(not(nullValue())));
-                        assertThat(tweet.getUser(), is(not(nullValue())));
-
-                        tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
-
-                    } else if( detected == Retweet.class ) {
-
-                        Retweet retweet = mapper.convertValue(event, Retweet.class);
-
-                        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
-                        assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue())));
-                        assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue())));
-                        assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue())));
-                        assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
-                        assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
-
-                        retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
-
-                    } else if( detected == Delete.class ) {
-
-                        Delete delete = mapper.convertValue(event, Delete.class);
-
-                        assertThat(delete.getDelete(), is(not(nullValue())));
-                        assertThat(delete.getDelete().getStatus(), is(not(nullValue())));
-                        assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue())));
-                        assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue())));
-
-                    } else {
-                        Assert.fail();
-                    }
-
-                }
-            }
-        } catch( Exception e ) {
-            LOGGER.error("Exception: ", e);
-            Assert.fail();
-        }
-
-        assertThat(tweetlinks, is(greaterThan(0)));
-        assertThat(retweetlinks, is(greaterThan(0)));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
new file mode 100644
index 0000000..4d6a3de
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.data;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+/**
+* Tests serialization / deserialization of twitter jsons
+*/
+public class TwitterObjectMapperIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        int tweetlinks = 0;
+        int retweetlinks = 0;
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if(!StringUtils.isEmpty(line))
+                {
+                    LOGGER.info("raw: {}", line);
+
+                    Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
+
+                    ObjectNode event = (ObjectNode) mapper.readTree(line);
+
+                    assertThat(event, is(not(nullValue())));
+
+                    if( detected == Tweet.class ) {
+
+                        Tweet tweet = mapper.convertValue(event, Tweet.class);
+
+                        assertThat(tweet, is(not(nullValue())));
+                        assertThat(tweet.getCreatedAt(), is(not(nullValue())));
+                        assertThat(tweet.getText(), is(not(nullValue())));
+                        assertThat(tweet.getUser(), is(not(nullValue())));
+
+                        tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
+
+                    } else if( detected == Retweet.class ) {
+
+                        Retweet retweet = mapper.convertValue(event, Retweet.class);
+
+                        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
+
+                        retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
+
+                    } else if( detected == Delete.class ) {
+
+                        Delete delete = mapper.convertValue(event, Delete.class);
+
+                        assertThat(delete.getDelete(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue())));
+
+                    } else {
+                        Assert.fail();
+                    }
+
+                }
+            }
+        } catch( Exception e ) {
+            LOGGER.error("Exception: ", e);
+            Assert.fail();
+        }
+
+        assertThat(tweetlinks, is(greaterThan(0)));
+        assertThat(retweetlinks, is(greaterThan(0)));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
new file mode 100644
index 0000000..558bb7c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterFollowingProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterFollowingProviderIT {
+
+    @Test
+    public void testTwitterFollowingProvider() throws Exception {
+
+        String configfile = "./target/test-classes/TwitterFollowingProviderIT.conf";
+        String outfile = "./target/test-classes/TwitterFollowingProviderIT.stdout.txt";
+
+        TwitterFollowingProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() == 10000);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
new file mode 100644
index 0000000..880f5df
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterStreamProviderIT {
+
+    final String outfile = "./target/test-classes/TwitterStreamProviderIT.stdout.txt";
+
+    @Test
+    public void testTwitterStreamProvider() throws Exception {
+
+
+        Thread testThread = new Thread(
+            new Runnable() {
+
+                String configfile = "./target/test-classes/TwitterStreamProviderIT.conf";
+
+                @Override
+                public void run() {
+                    TwitterStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+                }
+            }
+        );
+        testThread.start();
+        testThread.join(30000);
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() > 25);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..9e26528
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterTimelineProviderIT {
+
+    @Test
+    public void testTwitterTimelineProvider() throws Exception {
+
+        String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
+        String outfile = "./target/test-classes/TwitterTimelineProviderIT.stdout.txt";
+
+        TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() == 1000);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
new file mode 100644
index 0000000..e489f64
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterUserInformationProviderIT {
+
+    @Test
+    public void testTwitterUserInformationProvider() throws Exception {
+
+        String configfile = "./target/test-classes/TwitterUserInformationProviderIT.conf";
+        String outfile = "./target/test-classes/TwitterUserInformationProviderIT.stdout.txt";
+
+        TwitterUserInformationProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() > 750);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
new file mode 100644
index 0000000..755a98e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.converter.ActivityConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.converter.*}
+ */
+public class TwitterActivityConvertersTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+    private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
+
+    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
 ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
 /profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
 e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
 \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
 ":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+    private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
+    private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
+
+    @Test
+    public void testConvertTweet() {
+        List<Activity> activityList = activityConverterUtil.convert(tweet);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testConvertRetweet() {
+        List<Activity> activityList = activityConverterUtil.convert(retweet);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testConvertDelete() {
+        List<Activity> activityList = activityConverterUtil.convert(delete);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testConvertFollow() {
+        List<Activity> activityList = activityConverterUtil.convert(follow);
+        Assert.assertTrue(activityList.size() == 1);
+        Activity activity = activityList.get(0);
+        if( !ActivityUtil.isValid(activity) )
+            Assert.fail();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
new file mode 100644
index 0000000..11cd1e0
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import org.apache.streams.converter.ActivityObjectConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.converter.*}
+ */
+public class TwitterActivityObjectsConvertersTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class);
+
+    private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance();
+
+    private String user = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_
 url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\
 "urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/ai\u2026\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n";
+
+    @Test
+    public void testConvertUser() {
+        ActivityObject activityObject = activityObjectConverterUtil.convert(user);
+        assert( activityObject != null );
+        if( !ActivityUtil.isValid(activityObject) )
+            Assert.fail();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
new file mode 100644
index 0000000..a1ca7c5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.processor.TwitterEventClassifier}
+ */
+public class TwitterDocumentClassifierTest {
+
+    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
 ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
 /profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
 e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
 \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
 ":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+    private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
+    private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
+    private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl
 y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
+
+    @Test
+    public void testDetectTweet() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if( !result.equals(Tweet.class) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectRetweet() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if( !result.equals(Retweet.class) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectDelete() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if( !result.equals(Delete.class) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectFollow() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if( !result.equals(Follow.class) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectUser() {
+        List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
+        Assert.assertTrue(detected.size() == 1);
+        Class result = detected.get(0);
+        if (!result.equals(User.class))
+            Assert.fail();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
new file mode 100644
index 0000000..378978a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
@@ -0,0 +1,8 @@
+twitter {
+  info = [
+    18055613
+  ]
+  endpoint = followers
+  ids_only = true
+  max_items = 10000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
new file mode 100644
index 0000000..291a17c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
@@ -0,0 +1,6 @@
+twitter {
+  endpoint = sample
+  track = [
+    "data"
+  ]
+}


[4/8] incubator-streams git commit: fixes while testing flink examples

Posted by sb...@apache.org.
fixes while testing flink examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9495cf52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9495cf52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9495cf52

Branch: refs/heads/master
Commit: 9495cf52e3d1c5d5100566364bfa30447555682a
Parents: d9e58cd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:41:48 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:41:48 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowingProvider.java      |  9 ++--
 .../provider/TwitterFollowingProviderTask.java  | 52 +++++++++-----------
 .../provider/TwitterStreamProcessor.java        |  5 +-
 .../TwitterUserInformationProvider.java         |  6 ++-
 4 files changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 27c8526..4c3a828 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -88,7 +88,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
         Twitter client = getTwitterClient();
 
         for (int i = 0; i < ids.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
             executor.submit(providerTask);
         }
     }
@@ -97,7 +97,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
         Twitter client = getTwitterClient();
 
         for (int i = 0; i < screenNames.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
             executor.submit(providerTask);
         }
 
@@ -106,7 +106,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
     @Override
     public StreamsResultSet readCurrent() {
 
-        LOGGER.debug("Providing {} docs", providerQueue.size());
+        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
 
         StreamsResultSet result;
 
@@ -115,12 +115,13 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
             result = new StreamsResultSet(providerQueue);
             result.setCounter(new DatumStatusCounter());
             providerQueue = constructQueue();
+            LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
         } finally {
             lock.writeLock().unlock();
         }
 
         if (providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("Finished.  Cleaning up...");
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
             running.set(false);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index cc71d48..f2346fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -44,27 +44,20 @@ public class TwitterFollowingProviderTask implements Runnable {
     protected TwitterFollowingProvider provider;
     protected Twitter client;
     protected Long id;
-    protected Boolean idsOnly;
     protected String screenName;
-    protected String endpoint;
 
-    private int max_per_page = 200;
     int count = 0;
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint, Boolean idsOnly) {
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
         this.provider = provider;
         this.client = twitter;
         this.id = id;
-        this.endpoint = endpoint;
-        this.idsOnly = idsOnly;
     }
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint, Boolean idsOnly) {
+    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
         this.provider = provider;
         this.client = twitter;
         this.screenName = screenName;
-        this.endpoint = endpoint;
-        this.idsOnly = idsOnly;
     }
 
 
@@ -84,9 +77,9 @@ public class TwitterFollowingProviderTask implements Runnable {
 
     protected void getFollowing(Long id) {
 
-        Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+        Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
 
-        if( idsOnly )
+        if( provider.getConfig().getIdsOnly() )
             collectIds(id);
         else
             collectUsers(id);
@@ -112,10 +105,10 @@ public class TwitterFollowingProviderTask implements Runnable {
                 }
 
                 PagableResponseList<twitter4j.User> list = null;
-                if( endpoint.equals("followers") )
-                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
-                else if( endpoint.equals("friends") )
-                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+                if( provider.getConfig().getEndpoint().equals("followers") )
+                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+                else if( provider.getConfig().getEndpoint().equals("friends") )
+                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
 
                 Preconditions.checkNotNull(list);
                 Preconditions.checkArgument(list.size() > 0);
@@ -126,11 +119,11 @@ public class TwitterFollowingProviderTask implements Runnable {
 
                     try {
                         Follow follow = null;
-                        if( endpoint.equals("followers") ) {
+                        if( provider.getConfig().getEndpoint().equals("followers") ) {
                             follow = new Follow()
                                     .withFollowee(mapper.readValue(userJson, User.class))
                                     .withFollower(mapper.readValue(otherJson, User.class));
-                        } else if( endpoint.equals("friends") ) {
+                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
                             follow = new Follow()
                                     .withFollowee(mapper.readValue(otherJson, User.class))
                                     .withFollower(mapper.readValue(userJson, User.class));
@@ -147,9 +140,9 @@ public class TwitterFollowingProviderTask implements Runnable {
                         LOGGER.warn("Exception: {}", e);
                     }
                 }
-                if( list.size() == max_per_page )
-                    curser = list.getNextCursor();
-                else break;
+                if( !list.hasNext() ) break;
+                if( list.getNextCursor() == 0 ) break;
+                curser = list.getNextCursor();
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -170,10 +163,10 @@ public class TwitterFollowingProviderTask implements Runnable {
             try
             {
                 twitter4j.IDs ids = null;
-                if( endpoint.equals("followers") )
-                    ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, max_per_page);
-                else if( endpoint.equals("friends") )
-                    ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, max_per_page);
+                if( provider.getConfig().getEndpoint().equals("followers") )
+                    ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+                else if( provider.getConfig().getEndpoint().equals("friends") )
+                    ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
 
                 Preconditions.checkNotNull(ids);
                 Preconditions.checkArgument(ids.getIDs().length > 0);
@@ -182,16 +175,15 @@ public class TwitterFollowingProviderTask implements Runnable {
 
                     try {
                         Follow follow = null;
-                        if( endpoint.equals("followers") ) {
+                        if( provider.getConfig().getEndpoint().equals("followers") ) {
                             follow = new Follow()
                                     .withFollowee(new User().withId(id))
                                     .withFollower(new User().withId(otherId));
-                        } else if( endpoint.equals("friends") ) {
+                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
                             follow = new Follow()
                                     .withFollowee(new User().withId(otherId))
                                     .withFollower(new User().withId(id));
                         }
-                        ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
 
                         Preconditions.checkNotNull(follow);
 
@@ -203,9 +195,9 @@ public class TwitterFollowingProviderTask implements Runnable {
                         LOGGER.warn("Exception: {}", e);
                     }
                 }
-                if( ids.hasNext() )
-                    curser = ids.getNextCursor();
-                else break;
+                if( !ids.hasNext() ) break;
+                if( ids.getNextCursor() == 0 ) break;
+                curser = ids.getNextCursor();
             }
             catch(TwitterException twitterException) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index f0690f8..8ea65eb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -88,8 +88,9 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
         @Override
         public List<StreamsDatum> call() throws Exception {
             if(item != null) {
-                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-                StreamsDatum rawDatum = new StreamsDatum(objectNode);
+                Class itemClass = TwitterEventClassifier.detectClass(item);
+                Object document = mapper.readValue(item, itemClass);
+                StreamsDatum rawDatum = new StreamsDatum(document);
                 return Lists.newArrayList(rawDatum);
             }
             return Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 78eb3e6..4231f56 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -197,7 +197,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     public StreamsResultSet readCurrent() {
 
-        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+        LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
 
         StreamsResultSet result;
 
@@ -206,7 +206,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             result = new StreamsResultSet(providerQueue);
             result.setCounter(new DatumStatusCounter());
             providerQueue = constructQueue();
-            LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+            LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
         } finally {
             lock.writeLock().unlock();
         }
@@ -215,6 +215,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
             LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
             running.set(false);
+
+            LOGGER.info("Exiting");
         }
 
         return result;


[5/8] incubator-streams git commit: per PR feedback, don’t use import .*;

Posted by sb...@apache.org.
per PR feedback, don\u2019t use import .*;


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1540b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1540b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1540b15

Branch: refs/heads/master
Commit: f1540b15c2dcbb6c99cd9be90cbe2626b8ef49fc
Parents: 9495cf5
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 18:06:00 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 18:06:00 2016 -0500

----------------------------------------------------------------------
 .../TwitterJsonDeleteActivityConverter.java     |  2 +-
 .../TwitterJsonRetweetActivityConverter.java    |  2 +-
 .../TwitterJsonTweetActivityConverter.java      |  2 +-
 ...terJsonUserstreameventActivityConverter.java |  3 ++-
 .../FetchAndReplaceTwitterProcessor.java        | 11 ++++++---
 .../processor/TwitterEventProcessor.java        | 13 +----------
 .../twitter/provider/TwitterConfigurator.java   |  8 +++----
 .../provider/TwitterEventClassifier.java        |  7 +++++-
 .../twitter/provider/TwitterStreamProvider.java | 24 +++++++++++++++-----
 .../provider/TwitterTimelineProvider.java       | 16 +++++++++----
 .../provider/TwitterTimelineProviderTask.java   |  6 +++--
 .../TwitterUserInformationProvider.java         |  7 ++++--
 .../test/TwitterDocumentClassifierTest.java     |  6 ++++-
 .../twitter/test/TwitterObjectMapperIT.java     |  5 ++--
 14 files changed, 69 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
index eb35b71..3e61ef9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
@@ -30,7 +30,7 @@ import org.apache.streams.twitter.pojo.Tweet;
 import java.io.Serializable;
 import java.util.List;
 
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
index 22a4a58..30a1916 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
@@ -28,7 +28,7 @@ import org.apache.streams.twitter.pojo.Retweet;
 import java.io.Serializable;
 import java.util.List;
 
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
index 0b9f0ec..0997a7f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
@@ -28,7 +28,7 @@ import org.apache.streams.twitter.pojo.Tweet;
 import java.io.Serializable;
 import java.util.List;
 
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
index 357c41c..b3647fa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
@@ -30,7 +30,8 @@ import org.apache.streams.twitter.pojo.UserstreamEvent;
 
 import java.util.List;
 
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.formatId;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 8c5f55b..8330167 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -28,21 +28,26 @@ import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterConfiguration;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterConfigurator;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
 import org.apache.streams.twitter.provider.TwitterProviderUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.util.List;
 
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 /**
  *  Given an Activity, fetches the tweet by the activity object id and replaces the existing activity with the converted activity

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 45f84ff..ed6b90a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -18,27 +18,16 @@
 
 package org.apache.streams.twitter.processor;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.*;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 
 /**
  * This class performs conversion of a twitter event to a specified outClass

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
index 370960b..4b7b3ef 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
@@ -19,17 +19,15 @@
 package org.apache.streams.twitter.provider;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.*;
+import org.apache.streams.twitter.TwitterConfiguration;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * This class resolves TwitterConfiguration from typesafe config

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index f54dd0a..9466c2e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -24,8 +24,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.*;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
 
 import java.io.IOException;
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 30e2b56..f584950 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
@@ -26,7 +25,11 @@ import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
 import com.twitter.hbc.core.Hosts;
 import com.twitter.hbc.core.HttpHosts;
-import com.twitter.hbc.core.endpoint.*;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.BasicAuth;
@@ -34,7 +37,12 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
@@ -45,10 +53,14 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * TwitterStreamProvider wraps a hosebird client and passes recieved documents

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 61cddaf..2924623 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -38,22 +37,29 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.User;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index b8d5e1d..111d213 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -23,12 +23,14 @@ import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.*;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Paging;
 import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 4231f56..44f8a24 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -21,7 +21,6 @@ package org.apache.streams.twitter.provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.NotImplementedException;
@@ -52,7 +51,11 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
index 1bf3691..044fe3c 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
@@ -19,7 +19,11 @@
 package org.apache.streams.twitter.test;
 
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.*;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
index 4da2af2..e8bbf49 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
@@ -31,7 +31,6 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +39,9 @@ import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertThat;