You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:03 UTC

[1/8] git commit: Added user information and made some other modifications to increase the readability.

Repository: incubator-streams
Updated Branches:
  refs/heads/master e6ffe29e8 -> ae27541e0


Added user information and made some other modifications to increase the readability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/741a4544
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/741a4544
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/741a4544

Branch: refs/heads/master
Commit: 741a45445ca2e6ad49f18c5ddd151c04de58fb6f
Parents: e6ffe29
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri May 2 12:47:35 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri May 2 12:47:35 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            |  10 +-
 .../provider/TwitterStreamConfigurator.java     |  62 +++-
 .../twitter/provider/TwitterStreamProvider.java |   1 -
 .../provider/TwitterTimelineProvider.java       | 119 +++-----
 .../provider/TwitterTimelineProviderTask.java   |   7 -
 .../TwitterUserInformationProvider.java         | 286 +++++++++++++++++++
 .../com/twitter/TwitterConfiguration.json       |  70 +++++
 .../com/twitter/TwitterStreamConfiguration.json |  61 +---
 .../TwitterUserInformationConfiguration.json    |  17 ++
 .../streams/twitter/test/SimpleTweetTest.java   |   4 +
 10 files changed, 472 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 3c27b8c..8a41ca5 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>streams-config</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-util</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -73,9 +68,8 @@
         <dependency>
             <groupId>org.twitter4j</groupId>
             <artifactId>twitter4j-core</artifactId>
-            <version>3.0.5</version>
+            <version>[4.0,)</version>
         </dependency>
-
     </dependencies>
 
     <build>
@@ -118,7 +112,9 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
+                        <sourcePath>src/main/jsonschema/com/twitter/TwitterConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 9bf2d9a..5435f24 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -1,15 +1,15 @@
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterBasicAuthConfiguration;
-import org.apache.streams.twitter.TwitterOAuthConfiguration;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -18,19 +18,18 @@ import java.util.List;
 public class TwitterStreamConfigurator {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class);
+    private final static ObjectMapper mapper = new ObjectMapper();
 
-    public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
 
-        TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
-
-        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+    public static TwitterConfiguration detectTwitterConfiguration(Config config) {
+        TwitterConfiguration twitterConfiguration = new TwitterConfiguration();
 
         try {
             Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
             TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration();
             twitterBasicAuthConfiguration.setUsername(basicauth.getString("username"));
             twitterBasicAuthConfiguration.setPassword(basicauth.getString("password"));
-            twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration);
+            twitterConfiguration.setBasicauth(twitterBasicAuthConfiguration);
         } catch( ConfigException ce ) {}
 
         try {
@@ -40,27 +39,60 @@ public class TwitterStreamConfigurator {
             twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
             twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
             twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
-            twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+            twitterConfiguration.setOauth(twitterOAuthConfiguration);
         } catch( ConfigException ce ) {}
 
+        twitterConfiguration.setEndpoint(config.getString("endpoint"));
+
+        return twitterConfiguration;
+    }
+
+    public static TwitterStreamConfiguration detectConfiguration(Config config) {
+
+        TwitterStreamConfiguration twitterStreamConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterStreamConfiguration.class);
+
         try {
-            twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
+            twitterStreamConfiguration.setTrack(config.getStringList("track"));
         } catch( ConfigException ce ) {}
 
         try {
+            // create the array
             List<Long> follows = Lists.newArrayList();
-            for( Integer id : twitter.getIntList("follow"))
-                follows.add(new Long(id));
+            // add the ids of the people we want to 'follow'
+            for(Integer id : config.getIntList("follow"))
+                follows.add((long)id);
+            // set the array
             twitterStreamConfiguration.setFollow(follows);
+
         } catch( ConfigException ce ) {}
 
-        twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
-        twitterStreamConfiguration.setWith(twitter.getString("with"));
-        twitterStreamConfiguration.setReplies(twitter.getString("replies"));
+        twitterStreamConfiguration.setFilterLevel(config.getString("filter-level"));
+        twitterStreamConfiguration.setWith(config.getString("with"));
+        twitterStreamConfiguration.setReplies(config.getString("replies"));
         twitterStreamConfiguration.setJsonStoreEnabled("true");
         twitterStreamConfiguration.setIncludeEntities("true");
 
         return twitterStreamConfiguration;
     }
 
+    public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) {
+
+        TwitterUserInformationConfiguration twitterUserInformationConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class);
+
+        try {
+            if(config.hasPath("info"))
+            {
+                List<String> info = new ArrayList<String>();
+
+                for (String s : config.getStringList("info"))
+                    info.add(s);
+            }
+        }
+        catch(Exception e) {
+            LOGGER.error("There was an error: {}", e.getMessage());
+        }
+
+        return twitterUserInformationConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 3df7d02..b1785e5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -1,6 +1,5 @@
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b9551ad..2c39cf9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -1,11 +1,7 @@
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -49,17 +45,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         this.config = config;
     }
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+    protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500));
 
-    protected Twitter client;
+    protected int idsCount;
     protected Iterator<Long> ids;
 
-    ListenableFuture providerTaskComplete;
-//
-//    public BlockingQueue<Object> getInQueue() {
-//        return inQueue;
-//    }
-
     protected ListeningExecutorService executor;
 
     protected DateTime start;
@@ -74,6 +64,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public TwitterTimelineProvider() {
         Config config = StreamsConfigurator.config.getConfig("twitter");
         this.config = TwitterStreamConfigurator.detectConfiguration(config);
+
     }
 
     public TwitterTimelineProvider(TwitterStreamConfiguration config) {
@@ -95,43 +86,19 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         return this.providerQueue;
     }
 
-//    public void run() {
-//
-//        LOGGER.info("{} Running", STREAMS_ID);
-//
-//        while( ids.hasNext() ) {
-//            Long currentId = ids.next();
-//            LOGGER.info("Provider Task Starting: {}", currentId);
-//            captureTimeline(currentId);
-//        }
-//
-//        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
-//
-//        client.shutdown();
-//
-//        LOGGER.info("{} Exiting", STREAMS_ID);
-//
-//        while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
-//            try {
-//                Thread.sleep(100);
-//            } catch (InterruptedException e) {}
-//        }
-//    }
-
     @Override
     public void startStream() {
         // no op
     }
 
-    private void captureTimeline(long currentId) {
+    protected void captureTimeline(long currentId) {
 
         Paging paging = new Paging(1, 200);
         List<Status> statuses = null;
-        boolean KeepGoing = true;
-        boolean hadFailure = false;
 
         do
         {
+            Twitter client = getTwitterClient();
             int keepTrying = 0;
 
             // keep trying to load, give it 5 attempts.
@@ -143,20 +110,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 {
                     statuses = client.getUserTimeline(currentId, paging);
 
-                    for (Status tStat : statuses)
-                    {
-//                        if( provider.start != null &&
-//                                provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
-//                        {
-//                            // they hit the last date we wanted to collect
-//                            // we can now exit early
-//                            KeepGoing = false;
-//                        }
-                        // emit the record
-                        String json = DataObjectFactory.getRawJSON(tStat);
-
-                        providerQueue.offer(new StreamsDatum(json));
+                    for (Status tStat : statuses) {
+                        String json = TwitterObjectFactory.getRawJSON(tStat);
 
+                        while(!providerQueue.offer(new StreamsDatum(json))) {
+                            sleep();
+                        }
                     }
 
                     paging.setPage(paging.getPage() + 1);
@@ -166,19 +125,36 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 catch(TwitterException twitterException) {
                     keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
                 }
-                catch(Exception e)
-                {
-                    hadFailure = true;
+                catch(Exception e) {
                     keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
                 }
-                finally
-                {
-                    // Shutdown the twitter to release the resources
-                    client.shutdown();
-                }
             }
         }
-        while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+        while (shouldContinuePulling(statuses));
+    }
+
+    private Map<Long, Long> userPullInfo;
+
+    protected boolean shouldContinuePulling(List<Status> statuses) {
+        return (statuses != null) && (statuses.size() > 0);
+    }
+
+    private void sleep()
+    {
+        Thread.yield();
+        try {
+            // wait one tenth of a millisecond
+            Thread.yield();
+            Thread.sleep(new Random().nextInt(2));
+            Thread.yield();
+        }
+        catch(IllegalArgumentException e) {
+            // passing in static values, this will never happen
+        }
+        catch(InterruptedException e) {
+            // noOp, there must have been an issue sleeping
+        }
+        Thread.yield();
     }
 
     public StreamsResultSet readCurrent() {
@@ -244,21 +220,19 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
         Preconditions.checkNotNull(providerQueue);
-
         Preconditions.checkNotNull(this.klass);
-
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());
         Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
         Preconditions.checkNotNull(config.getOauth().getAccessToken());
         Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
         Preconditions.checkNotNull(config.getFollow());
 
-        Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
+        idsCount = config.getFollow().size();
         ids = config.getFollow().iterator();
+    }
 
+    protected Twitter getTwitterClient()
+    {
         String baseUrl = "https://api.twitter.com:443/1.1/";
 
         ConfigurationBuilder builder = new ConfigurationBuilder()
@@ -266,23 +240,18 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
                 .setOAuthAccessToken(config.getOauth().getAccessToken())
                 .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                .setJSONStoreEnabled(jsonStoreEnabled)
+                .setIncludeEntitiesEnabled(true)
+                .setJSONStoreEnabled(true)
                 .setAsyncNumThreads(3)
                 .setRestBaseURL(baseUrl)
                 .setIncludeMyRetweetEnabled(Boolean.TRUE)
-                .setIncludeRTsEnabled(Boolean.TRUE)
                 .setPrettyDebugEnabled(Boolean.TRUE);
 
-        client = new TwitterFactory(builder.build()).getInstance();
-
+        return new TwitterFactory(builder.build()).getInstance();
     }
 
     @Override
     public void cleanUp() {
-
-        client.shutdown();
-
         shutdownAndAwaitTermination(executor);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 9619f4f..9a1d4e7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -74,19 +74,12 @@ public class TwitterTimelineProviderTask implements Runnable {
                     hadFailure = true;
                     keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e);
                 }
-                finally
-                {
-                    // Shutdown the twitter to release the resources
-                    twitter.shutdown();
-                }
             }
         }
         while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
 
         LOGGER.info("Provider Finished.  Cleaning up...");
 
-        twitter.shutdown();
-
         LOGGER.info("Provider Exiting");
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
new file mode 100644
index 0000000..dac5cd6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -0,0 +1,286 @@
+package org.apache.streams.twitter.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+public class TwitterUserInformationProvider implements StreamsProvider, Serializable
+{
+
+    public static final String STREAMS_ID = "TwitterUserInformationProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
+
+
+    private TwitterUserInformationConfiguration twitterUserInformationConfiguration;
+
+    private Class klass;
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public TwitterUserInformationConfiguration getConfig()              { return twitterUserInformationConfiguration; }
+
+    public void setConfig(TwitterUserInformationConfiguration config)   { this.twitterUserInformationConfiguration = config; }
+
+    protected Iterator<Long[]> idsBatches;
+    protected Iterator<String[]> screenNameBatches;
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public TwitterUserInformationProvider() {
+        Config config = StreamsConfigurator.config.getConfig("twitter");
+        this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config);
+
+    }
+
+    public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) {
+        this.twitterUserInformationConfiguration = config;
+    }
+
+    public TwitterUserInformationProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("twitter");
+        this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config);
+        this.klass = klass;
+    }
+
+    public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) {
+        this.twitterUserInformationConfiguration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        // no op
+    }
+
+
+    private void loadBatch(Long[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                long[] toQuery = new long[ids.length];
+                for(int i = 0; i < ids.length; i++)
+                    toQuery[i] = ids[i];
+
+                for (User tStat : client.lookupUsers(toQuery)) {
+                    String json = DataObjectFactory.getRawJSON(tStat);
+                    providerQueue.offer(new StreamsDatum(json));
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        }
+    }
+
+    private void loadBatch(String[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                for (User tStat : client.lookupUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(tStat);
+                    providerQueue.offer(new StreamsDatum(json));
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+            }
+        }
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
+
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
+
+
+        LOGGER.info("Finished.  Cleaning up...");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+
+        LOGGER.info("Exiting");
+
+        return result;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken());
+        Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret());
+        Preconditions.checkNotNull(twitterUserInformationConfiguration.getInfo());
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : twitterUserInformationConfiguration.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
+    }
+
+    protected Twitter getTwitterClient()
+    {
+        String baseUrl = "https://api.twitter.com:443/1.1/";
+
+        ConfigurationBuilder builder = new ConfigurationBuilder()
+                .setOAuthConsumerKey(twitterUserInformationConfiguration.getOauth().getConsumerKey())
+                .setOAuthConsumerSecret(twitterUserInformationConfiguration.getOauth().getConsumerSecret())
+                .setOAuthAccessToken(twitterUserInformationConfiguration.getOauth().getAccessToken())
+                .setOAuthAccessTokenSecret(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret())
+                .setIncludeEntitiesEnabled(true)
+                .setJSONStoreEnabled(true)
+                .setAsyncNumThreads(3)
+                .setRestBaseURL(baseUrl)
+                .setIncludeMyRetweetEnabled(Boolean.TRUE)
+                .setPrettyDebugEnabled(Boolean.TRUE);
+
+        return new TwitterFactory(builder.build()).getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
new file mode 100644
index 0000000..9e22b93
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
@@ -0,0 +1,70 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "jsonStoreEnabled": {
+            "default" : true,
+            "type": "string"
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appName": {
+                    "type": "string"
+                },
+                "consumerKey": {
+                    "type": "string"
+                },
+                "consumerSecret": {
+                    "type": "string"
+                },
+                "accessToken": {
+                    "type": "string"
+                },
+                "accessTokenSecret": {
+                    "type": "string"
+                }
+            }
+        },
+        "basicauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "username": {
+                    "type": "string"
+                },
+                "password": {
+                    "type": "string"
+                }
+            }
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
index c1a0d0c..2ff7362 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
@@ -3,34 +3,12 @@
     "$schema": "http://json-schema.org/draft-03/schema",
     "id": "#",
     "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port"
-        },
-        "version": {
-            "type": "string",
-            "description": "The version"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
         "includeEntities": {
             "type": "string"
         },
-        "jsonStoreEnabled": {
-            "type": "string"
-        },
         "truncated": {
             "type": "boolean"
         },
@@ -59,43 +37,6 @@
             "items": {
                 "type": "string"
             }
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "appName": {
-                    "type": "string"
-                },
-                "consumerKey": {
-                    "type": "string"
-                },
-                "consumerSecret": {
-                    "type": "string"
-                },
-                "accessToken": {
-                    "type": "string"
-                },
-                "accessTokenSecret": {
-                    "type": "string"
-                }
-            }
-        },
-        "basicauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "username": {
-                    "type": "string"
-                },
-                "password": {
-                    "type": "string"
-                }
-            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
new file mode 100644
index 0000000..afd203f
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index b8bfe1a..31ddfce 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
@@ -21,6 +23,8 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**


[4/8] git commit: Sorry, this isn't even being used.

Posted by sb...@apache.org.
Sorry, this isn't even being used.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4f30bd2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4f30bd2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4f30bd2a

Branch: refs/heads/master
Commit: 4f30bd2a8cec2869765fa875d303e98492242b7b
Parents: 651be79
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 16:37:53 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 16:37:53 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/s3/S3PersistWriterTask.java  | 37 --------------------
 1 file changed, 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4f30bd2a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
deleted file mode 100644
index d791c87..0000000
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.streams.s3;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class S3PersistWriterTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriterTask.class);
-
-    private S3PersistWriter writer;
-
-    public S3PersistWriterTask(S3PersistWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void run() {
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
-        }
-
-    }
-
-}


[8/8] git commit: Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.

Posted by sb...@apache.org.
Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ae27541e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ae27541e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ae27541e

Branch: refs/heads/master
Commit: ae27541e08674f4db6996e065516b32b8fe0f45d
Parents: d1018e9
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 18:05:09 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 18:05:09 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistReaderTask.java  | 16 ++--------------
 .../java/org/apache/streams/s3/S3PersistWriter.java |  1 -
 .../twitter/provider/TwitterTimelineProvider.java   | 14 ++++++--------
 .../provider/TwitterUserInformationProvider.java    |  3 ++-
 4 files changed, 10 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 9967216..73763e6 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.s3;
 import com.google.common.base.Strings;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class S3PersistReaderTask implements Runnable {
                         reader.countersCurrent.incrementAttempt();
                         String[] fields = line.split(Character.toString(reader.DELIMITER));
                         StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
-                        write( entry );
+                        ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
                         reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                     }
                 }
@@ -81,17 +82,4 @@ public class S3PersistReaderTask implements Runnable {
             LOGGER.error("There was an issue closing file: {}", file);
         }
     }
-
-
-    private void write( StreamsDatum entry ) {
-        boolean success;
-        do {
-            synchronized( S3PersistReader.class ) {
-                success = reader.persistQueue.offer(entry);
-            }
-            Thread.yield();
-        }
-        while( !success );
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 98671ba..058f748 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -62,7 +62,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     }};
 
     private OutputStreamWriter currentWriter = null;
-    protected volatile Queue<StreamsDatum> persistQueue;
 
     public AmazonS3Client getAmazonS3Client() {
         return this.amazonS3Client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2c39cf9..b456fa4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,7 +2,6 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
@@ -11,18 +10,21 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by sblackmon on 12/10/13.
@@ -105,17 +107,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             //while (keepTrying < 10)
             while (keepTrying < 1)
             {
-
                 try
                 {
                     statuses = client.getUserTimeline(currentId, paging);
 
                     for (Status tStat : statuses) {
                         String json = TwitterObjectFactory.getRawJSON(tStat);
-
-                        while(!providerQueue.offer(new StreamsDatum(json))) {
-                            sleep();
-                        }
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
                     }
 
                     paging.setPage(paging.getPage() + 1);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 04aa1fe..049c3bb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
                 for (User tStat : client.lookupUsers(toQuery)) {
                     String json = DataObjectFactory.getRawJSON(tStat);
-                    providerQueue.offer(new StreamsDatum(json));
+                    ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
                 }
                 keepTrying = 10;
             }


[5/8] git commit: Changes based on feedback.

Posted by sb...@apache.org.
Changes based on feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1a03d5f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1a03d5f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1a03d5f6

Branch: refs/heads/master
Commit: 1a03d5f67ae0b634f903733dd14a26236331690c
Parents: 4f30bd2
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:13:06 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:13:06 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3Configurator.java   |   1 +
 .../streams/s3/S3ObjectInputStreamWrapper.java  | 108 ++++++++++++-------
 .../streams/s3/S3OutputStreamWrapper.java       |  67 ++++++------
 .../org/apache/streams/s3/S3PersistReader.java  |  42 +++++---
 .../apache/streams/s3/S3PersistReaderTask.java  |  19 ++--
 .../org/apache/streams/s3/S3PersistWriter.java  |   5 +-
 6 files changed, 140 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index 8190404..3413ef7 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -24,6 +24,7 @@ public class S3Configurator {
 
         if(!(protocol.equals("https") || protocol.equals("http"))) {
             // you must specify either HTTP or HTTPS
+            throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol");
         }
 
         s3Configuration.setProtocol(protocol.toLowerCase());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index 2a2dba0..900ebfb 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -9,28 +9,15 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-
 /**
- * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over
- * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read
- * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will
- * exhaust the stream and transfer a ton of data attempting to do so.
- *
+ * There is a nuance associated with reading portions of files in S3. Everything occurs over
+ * an Apache HTTP client object. Apache and therefore Amazon defaults to re-using the stream.
+ * As a result, if you only intend read a small portion of the file. You must first "abort" the
+ * stream, then close the 'inputStream'. Otherwise, Apache will exhaust the entire stream
+ * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file
+ * this becomes problematic.
  *
- * Author   Smashew
- * Date     2014-04-11
- *
- * After a few more days and some demos that had some issues with concurrency and high user load. This
- * was also discovered. There is an issue with the S3Object's HTTP connection not being released back
- * to the connection pool (until it times out) even once the item is garbage collected. So....
+ * This class operates as a wrapper to fix the aforementioned nuances.
  *
  * Reference:
  * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
@@ -43,39 +30,82 @@ public class S3ObjectInputStreamWrapper extends InputStream
     private final S3ObjectInputStream is;
     private boolean isClosed = false;
 
+    /**
+     * Create an input stream safely from
+     * @param s3Object
+     */
     public S3ObjectInputStreamWrapper(S3Object s3Object) {
         this.s3Object = s3Object;
         this.is = this.s3Object.getObjectContent();
     }
 
-    public int hashCode()                                           { return this.is.hashCode(); }
-    public boolean equals(Object obj)                               { return this.is.equals(obj); }
-    public String toString()                                        { return this.is.toString(); }
-    public int read() throws IOException                            { return this.is.read(); }
-    public int read(byte[] b) throws IOException                    { return this.is.read(b); }
-    public int read(byte[] b, int off, int len) throws IOException  { return this.is.read(b, off, len); }
-    public long skip(long n) throws IOException                     { return this.is.skip(n); }
-    public int available() throws IOException                       { return this.is.available(); }
-    public boolean markSupported()                                  { return this.is.markSupported(); }
-    public synchronized void mark(int readlimit)                    { this.is.mark(readlimit); }
-    public synchronized void reset() throws IOException             { this.is.reset(); }
+    public int hashCode() {
+        return this.is.hashCode();
+    }
+
+    public boolean equals(Object obj) {
+        return this.is.equals(obj);
+    }
+
+    public String toString() {
+        return this.is.toString();
+    }
+
+    public int read() throws IOException {
+        return this.is.read();
+    }
+
+    public int read(byte[] b) throws IOException {
+        return this.is.read(b);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        return this.is.read(b, off, len);
+    }
+
+    public long skip(long n) throws IOException {
+        return this.is.skip(n);
+    }
+
+    public int available() throws IOException {
+        return this.is.available();
+    }
+
+    public boolean markSupported() {
+        return this.is.markSupported();
+    }
+
+    public synchronized void mark(int readlimit) {
+        this.is.mark(readlimit);
+    }
+
+    public synchronized void reset() throws IOException {
+        this.is.reset();
+    }
 
     public void close() throws IOException {
         ensureEverythingIsReleased();
     }
 
-    public void ensureEverythingIsReleased()
-    {
+    public void ensureEverythingIsReleased() {
         if(this.isClosed)
             return;
 
-        // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION!
-        // Amazon S3 - HTTP Exhaust all file contents issue
+
         try {
+            // ensure that the S3 Object is closed properly.
+            this.s3Object.close();
+        } catch(Throwable e) {
+            LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
+        }
+
+
+        try {
+            // Abort the stream
             this.is.abort();
         }
-        catch(Exception e) {
-            LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage());
+        catch(Throwable e) {
+            LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
         }
 
         // close the input Stream Safely
@@ -98,8 +128,8 @@ public class S3ObjectInputStreamWrapper extends InputStream
 
     protected void finalize( ) throws Throwable
     {
-        try
-        {
+        try {
+            // If there is an accidental leak where the user did not close, call this on the classes destructor
             ensureEverythingIsReleased();
             super.finalize();
         } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index 8f55983..c488b48 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -9,33 +9,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.io.FilenameUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Date;
 import java.util.Map;
 
 /**
- *
- * Author:  Smashew
- * Date:    2014-04-14
- *
- * Description:
- * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly.
- *
- * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload
- * is kind of a PITA to deal with.
- *
- * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones
+ * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. The stream is written to the
+ * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become
+ * is directly controlled by the S3PersistWriter.
  */
 public class S3OutputStreamWrapper extends OutputStream
 {
@@ -47,11 +26,24 @@ public class S3OutputStreamWrapper extends OutputStream
     private final String fileName;
     private ByteArrayOutputStream outputStream;
     private final Map<String, String> metaData;
-
     private boolean isClosed = false;
 
-    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException
-    {
+    /**
+     * Create an OutputStream Wrapper
+     * @param amazonS3Client
+     * The Amazon S3 Client which will be handling the file
+     * @param bucketName
+     * The Bucket Name you are wishing to write to.
+     * @param path
+     * The path where the object will live
+     * @param fileName
+     * The fileName you ware wishing to write.
+     * @param metaData
+     * Any meta data that is to be written along with the object
+     * @throws IOException
+     * If there is an issue creating the stream, this
+     */
+    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
         this.amazonS3Client = amazonS3Client;
         this.bucketName = bucketName;
         this.path = path;
@@ -60,14 +52,21 @@ public class S3OutputStreamWrapper extends OutputStream
         this.outputStream = new ByteArrayOutputStream();
     }
 
-    /*
-     * The Methods that are overriden to support the 'OutputStream' object.
-     */
+    public void write(int b) throws IOException {
+        this.outputStream.write(b);
+    }
 
-    public void write(int b) throws IOException                         { this.outputStream.write(b); }
-    public void write(byte[] b) throws IOException                      { this.outputStream.write(b); }
-    public void write(byte[] b, int off, int len) throws IOException    { this.outputStream.write(b, off, len); }
-    public void flush() throws IOException                              { this.outputStream.flush(); }
+    public void write(byte[] b) throws IOException {
+        this.outputStream.write(b);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        this.outputStream.write(b, off, len);
+    }
+
+    public void flush() throws IOException {
+        this.outputStream.flush();
+    }
 
     /**
      * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index a987a47..938dc66 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -15,11 +15,7 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.math.BigInteger;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Queue;
@@ -43,13 +39,33 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     protected DatumStatusCounter countersTotal = new DatumStatusCounter();
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
 
-    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client; }
-    public S3ReaderConfiguration getS3ReaderConfiguration()             { return this.s3ReaderConfiguration; }
-    public String getBucketName()                                       { return this.s3ReaderConfiguration.getBucket(); }
-    public StreamsResultSet readNew(BigInteger sequence)                { return null; }
-    public StreamsResultSet readRange(DateTime start, DateTime end)     { return null; }
-    public DatumStatusCounter getDatumStatusCounter()                   { return countersTotal; }
-    public Collection<String> getFiles()                                { return this.files; }
+    public AmazonS3Client getAmazonS3Client() {
+        return this.amazonS3Client;
+    }
+
+    public S3ReaderConfiguration getS3ReaderConfiguration() {
+        return this.s3ReaderConfiguration;
+    }
+
+    public String getBucketName() {
+        return this.s3ReaderConfiguration.getBucket();
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    public DatumStatusCounter getDatumStatusCounter() {
+        return countersTotal;
+    }
+
+    public Collection<String> getFiles() {
+        return this.files;
+    }
 
     public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
         this.s3ReaderConfiguration = s3ReaderConfiguration;
@@ -111,7 +127,9 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
         this.executor = Executors.newSingleThreadExecutor();
     }
 
-    public void cleanUp() { }
+    public void cleanUp() {
+        // no Op
+    }
 
     public StreamsResultSet readAll() {
         startStream();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 70015fb..5b4abe4 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -23,20 +23,17 @@ public class S3PersistReaderTask implements Runnable {
     @Override
     public void run() {
 
-        for(String file : reader.getFiles())
-        {
-            // Create our buffered reader
+        for(String file : reader.getFiles()) {
 
+            // Create our buffered reader
             S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
             LOGGER.info("Reading: {} ", file);
 
             String line = "";
             try {
-                while((line = bufferedReader.readLine()) != null)
-                {
-                    if( !Strings.isNullOrEmpty(line) )
-                    {
+                while((line = bufferedReader.readLine()) != null) {
+                    if( !Strings.isNullOrEmpty(line) ) {
                         reader.countersCurrent.incrementAttempt();
                         String[] fields = line.split(Character.toString(reader.DELIMITER));
                         StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
@@ -44,9 +41,7 @@ public class S3PersistReaderTask implements Runnable {
                         reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                     }
                 }
-            }
-            catch (Exception e)
-            {
+            } catch (Exception e) {
                 e.printStackTrace();
                 LOGGER.warn(e.getMessage());
                 reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
@@ -57,7 +52,6 @@ public class S3PersistReaderTask implements Runnable {
             try {
                 closeSafely(file, is);
             } catch (Exception e) {
-                e.printStackTrace();
                 LOGGER.error(e.getMessage());
             }
         }
@@ -66,8 +60,7 @@ public class S3PersistReaderTask implements Runnable {
     private static void closeSafely(String file, Closeable closeable) {
         try {
             closeable.close();
-        }
-        catch(Exception e) {
+        } catch(Exception e) {
             LOGGER.error("There was an issue closing file: {}", file);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index c46ff03..3685012 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -9,7 +9,6 @@ import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.streams.core.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -188,7 +187,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         }
     }
 
-
     private String convertResultToString(StreamsDatum entry)
     {
         String metadata = null;
@@ -227,8 +225,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
                 this.objectMapper = new ObjectMapper();
 
             // Create the credentials Object
-            if(this.amazonS3Client == null)
-            {
+            if(this.amazonS3Client == null) {
                 AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
 
                 ClientConfiguration clientConfig = new ClientConfiguration();


[7/8] git commit: Added licenses, and more code formatting to comply with styles.

Posted by sb...@apache.org.
Added licenses, and more code formatting to comply with styles.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d1018e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d1018e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d1018e90

Branch: refs/heads/master
Commit: d1018e9070d8976ac3175f2b89af91c933d711f9
Parents: 361d122
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:32:35 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:32:35 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-s3/pom.xml                  | 17 +++++
 .../org/apache/streams/s3/S3Configurator.java   | 17 +++++
 .../streams/s3/S3ObjectInputStreamWrapper.java  | 17 +++++
 .../streams/s3/S3OutputStreamWrapper.java       | 17 +++++
 .../org/apache/streams/s3/S3PersistReader.java  | 17 +++++
 .../apache/streams/s3/S3PersistReaderTask.java  | 17 +++++
 .../org/apache/streams/s3/S3PersistWriter.java  | 76 +++++++++++++-------
 .../TwitterUserInformationProvider.java         | 24 ++++++-
 8 files changed, 175 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
index 4e9b9b1..5cadd5c 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
@@ -1,4 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index 3413ef7..dfa0426 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.fasterxml.jackson.databind.ObjectMapper;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index 900ebfb..c13314d 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.amazonaws.services.s3.model.S3Object;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index c488b48..08fc774 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.amazonaws.services.s3.AmazonS3Client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 938dc66..5c7413e 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.amazonaws.ClientConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 5b4abe4..9967216 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.google.common.base.Strings;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 3685012..98671ba 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.s3;
 
 import com.amazonaws.ClientConfiguration;
@@ -47,14 +64,33 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     private OutputStreamWriter currentWriter = null;
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client; }
-    public S3WriterConfiguration getS3WriterConfiguration()             { return this.s3WriterConfiguration; }
-    public List<String> getWrittenFiles()                               { return this.writtenFiles; }
-    public Map<String, String> getObjectMetaData()                      { return this.objectMetaData; }
-    public ObjectMapper getObjectMapper()                               { return this.objectMapper; }
+    public AmazonS3Client getAmazonS3Client() {
+        return this.amazonS3Client;
+    }
+
+    public S3WriterConfiguration getS3WriterConfiguration() {
+        return this.s3WriterConfiguration;
+    }
+
+    public List<String> getWrittenFiles() {
+        return this.writtenFiles;
+    }
 
-    public void setObjectMapper(ObjectMapper mapper)                    { this.objectMapper = mapper; }
-    public void setObjectMetaData(Map<String, String> val)              { this.objectMetaData = val; }
+    public Map<String, String> getObjectMetaData() {
+        return this.objectMetaData;
+    }
+
+    public ObjectMapper getObjectMapper() {
+        return this.objectMapper;
+    }
+
+    public void setObjectMapper(ObjectMapper mapper) {
+        this.objectMapper = mapper;
+    }
+
+    public void setObjectMetaData(Map<String, String> val) {
+        this.objectMetaData = val;
+    }
 
     /**
      * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
@@ -75,15 +111,13 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        synchronized (this)
-        {
+        synchronized (this) {
             // Check to see if we need to reset the file that we are currently working with
             if (this.currentWriter == null || ( this.bytesWrittenThisFile.get()  >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
                 try {
                     LOGGER.info("Resetting the file");
                     this.currentWriter = resetFile();
-                }
-                catch (Exception e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
@@ -108,8 +142,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
     }
 
-    private synchronized OutputStreamWriter resetFile() throws Exception
-    {
+    private synchronized OutputStreamWriter resetFile() throws Exception {
         // this will keep it thread safe, so we don't create too many files
         if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
             return this.currentWriter;
@@ -117,8 +150,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         closeAndDestroyWriter();
 
         // Create the path for where the file is going to live.
-        try
-        {
+        try {
             // generate a file name
             String fileName = this.s3WriterConfiguration.getWriterFilePrefix() +
                     (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv";
@@ -142,9 +174,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
             // return the output stream
             return new OutputStreamWriter(outputStream);
-        }
-        catch (Exception e)
-        {
+        } catch (Exception e) {
             LOGGER.error(e.getMessage());
             throw e;
         }
@@ -157,8 +187,8 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
             this.closeSafely(this.currentWriter);
             this.currentWriter = null;
 
-            //
-            LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
+            // Logging of information to alert the user to the activities of this class
+            LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
         }
     }
 
@@ -167,8 +197,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
             try {
                 writer.flush();
                 writer.close();
-            }
-            catch(Exception e) {
+            } catch(Exception e) {
                 // noOp
             }
             LOGGER.debug("File Closed");
@@ -180,8 +209,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         if(flushable != null) {
             try {
                 flushable.flush();
-            }
-            catch(IOException e) {
+            } catch(IOException e) {
                 // noOp
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index dac5cd6..04aa1fe 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -1,7 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
@@ -9,13 +25,15 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.*;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.User;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 


[2/8] git commit: S3 Reader / Writer for apache streams

Posted by sb...@apache.org.
S3 Reader / Writer for apache streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/544a0c92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/544a0c92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/544a0c92

Branch: refs/heads/master
Commit: 544a0c92ecfa1ab7e1334721abeeb881cd25cd3f
Parents: 741a454
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri May 2 13:03:50 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri May 2 13:04:06 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-amazon-aws/pom.xml      |  67 +++++
 .../streams-persist-s3/pom.xml                  |  87 +++++++
 .../org/apache/streams/s3/S3Configurator.java   |  64 +++++
 .../streams/s3/S3ObjectInputStreamWrapper.java  | 111 ++++++++
 .../streams/s3/S3OutputStreamWrapper.java       | 128 +++++++++
 .../org/apache/streams/s3/S3PersistReader.java  | 141 ++++++++++
 .../apache/streams/s3/S3PersistReaderTask.java  |  87 +++++++
 .../org/apache/streams/s3/S3PersistWriter.java  | 257 +++++++++++++++++++
 .../apache/streams/s3/S3PersistWriterTask.java  |  37 +++
 .../org/apache/streams/s3/S3Configuration.json  |  25 ++
 .../streams/s3/S3ReaderConfiguration.json       |  14 +
 .../streams/s3/S3WriterConfiguration.json       |  28 ++
 13 files changed, 1047 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index d80fc63..c7bbdf4 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,6 +44,7 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+		<module>streams-amazon-aws</module>
         <!--<module>streams-processor-lucene</module>-->
         <!--<module>streams-processor-tika</module>-->
         <module>streams-processor-urls</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
new file mode 100644
index 0000000..57a67cb
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-amazon-aws</artifactId>
+
+    <packaging>pom</packaging>
+    <name>streams-amazon-aws</name>
+
+    <properties>
+
+    </properties>
+
+    <modules>
+        <module>streams-persist-s3</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+	        <dependency>
+	            <groupId>com.amazonaws</groupId>
+	            <artifactId>aws-java-sdk</artifactId>
+	            <version>1.7.5</version>
+	        </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-config</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-core</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-pojo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
new file mode 100644
index 0000000..4e9b9b1
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-amazon-aws</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-s3</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3Configuration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.s3.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
new file mode 100644
index 0000000..8190404
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -0,0 +1,64 @@
+package org.apache.streams.s3;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3Configurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(S3Configurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static S3Configuration detectConfiguration(Config s3) {
+
+        S3Configuration s3Configuration = new S3Configuration();
+
+        s3Configuration.setBucket(s3.getString("bucket"));
+        s3Configuration.setKey(s3.getString("key"));
+        s3Configuration.setSecretKey(s3.getString("secretKey"));
+
+        // The Amazon S3 Library defaults to HTTPS
+        String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase();
+
+        if(!(protocol.equals("https") || protocol.equals("http"))) {
+            // you must specify either HTTP or HTTPS
+        }
+
+        s3Configuration.setProtocol(protocol.toLowerCase());
+
+        return s3Configuration;
+    }
+
+    public static S3ReaderConfiguration detectReaderConfiguration(Config s3) {
+
+        S3Configuration S3Configuration = detectConfiguration(s3);
+        S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class);
+
+        s3ReaderConfiguration.setReaderPath(s3.getString("readerPath"));
+
+        return s3ReaderConfiguration;
+    }
+
+    public static S3WriterConfiguration detectWriterConfiguration(Config s3) {
+
+        S3Configuration s3Configuration = detectConfiguration(s3);
+        S3WriterConfiguration s3WriterConfiguration  = mapper.convertValue(s3Configuration, S3WriterConfiguration.class);
+
+        String rootPath = s3.getString("writerPath");
+
+        // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path.
+        s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/"));
+
+        s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default");
+
+        if(s3.hasPath("maxFileSize"))
+            s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize"));
+        if(s3.hasPath("chunk"))
+            s3WriterConfiguration.setChunk(s3.getBoolean("chunk"));
+
+        return s3WriterConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
new file mode 100644
index 0000000..2a2dba0
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -0,0 +1,111 @@
+package org.apache.streams.s3;
+
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over
+ * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read
+ * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will
+ * exhaust the stream and transfer a ton of data attempting to do so.
+ *
+ *
+ * Author   Smashew
+ * Date     2014-04-11
+ *
+ * After a few more days and some demos that had some issues with concurrency and high user load. This
+ * was also discovered. There is an issue with the S3Object's HTTP connection not being released back
+ * to the connection pool (until it times out) even once the item is garbage collected. So....
+ *
+ * Reference:
+ * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
+ */
+public class S3ObjectInputStreamWrapper extends InputStream
+{
+    private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class);
+
+    private final S3Object s3Object;
+    private final S3ObjectInputStream is;
+    private boolean isClosed = false;
+
+    public S3ObjectInputStreamWrapper(S3Object s3Object) {
+        this.s3Object = s3Object;
+        this.is = this.s3Object.getObjectContent();
+    }
+
+    public int hashCode()                                           { return this.is.hashCode(); }
+    public boolean equals(Object obj)                               { return this.is.equals(obj); }
+    public String toString()                                        { return this.is.toString(); }
+    public int read() throws IOException                            { return this.is.read(); }
+    public int read(byte[] b) throws IOException                    { return this.is.read(b); }
+    public int read(byte[] b, int off, int len) throws IOException  { return this.is.read(b, off, len); }
+    public long skip(long n) throws IOException                     { return this.is.skip(n); }
+    public int available() throws IOException                       { return this.is.available(); }
+    public boolean markSupported()                                  { return this.is.markSupported(); }
+    public synchronized void mark(int readlimit)                    { this.is.mark(readlimit); }
+    public synchronized void reset() throws IOException             { this.is.reset(); }
+
+    public void close() throws IOException {
+        ensureEverythingIsReleased();
+    }
+
+    public void ensureEverythingIsReleased()
+    {
+        if(this.isClosed)
+            return;
+
+        // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION!
+        // Amazon S3 - HTTP Exhaust all file contents issue
+        try {
+            this.is.abort();
+        }
+        catch(Exception e) {
+            LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage());
+        }
+
+        // close the input Stream Safely
+        closeSafely(this.is);
+
+        // This corrects the issue with Open HTTP connections
+        closeSafely(this.s3Object);
+        this.isClosed = true;
+    }
+
+    private static void closeSafely(Closeable is) {
+        try {
+            if(is != null)
+                is.close();
+        } catch(Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage());
+        }
+    }
+
+    protected void finalize( ) throws Throwable
+    {
+        try
+        {
+            ensureEverythingIsReleased();
+            super.finalize();
+        } catch(Exception e) {
+            // this should never be called, just being very cautious
+            LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
new file mode 100644
index 0000000..8f55983
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -0,0 +1,128 @@
+package org.apache.streams.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.commons.io.FilenameUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ *
+ * Author:  Smashew
+ * Date:    2014-04-14
+ *
+ * Description:
+ * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly.
+ *
+ * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload
+ * is kind of a PITA to deal with.
+ *
+ * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones
+ */
+public class S3OutputStreamWrapper extends OutputStream
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class);
+
+    private final AmazonS3Client amazonS3Client;
+    private final String bucketName;
+    private final String path;
+    private final String fileName;
+    private ByteArrayOutputStream outputStream;
+    private final Map<String, String> metaData;
+
+    private boolean isClosed = false;
+
+    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException
+    {
+        this.amazonS3Client = amazonS3Client;
+        this.bucketName = bucketName;
+        this.path = path;
+        this.fileName = fileName;
+        this.metaData = metaData;
+        this.outputStream = new ByteArrayOutputStream();
+    }
+
+    /*
+     * The Methods that are overriden to support the 'OutputStream' object.
+     */
+
+    public void write(int b) throws IOException                         { this.outputStream.write(b); }
+    public void write(byte[] b) throws IOException                      { this.outputStream.write(b); }
+    public void write(byte[] b, int off, int len) throws IOException    { this.outputStream.write(b, off, len); }
+    public void flush() throws IOException                              { this.outputStream.flush(); }
+
+    /**
+     * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
+     * @throws IOException
+     * Exception thrown from the FileOutputStream
+     */
+    public void close() throws IOException {
+        if(!isClosed)
+        {
+            try
+            {
+                this.addFile();
+                this.outputStream.close();
+                this.outputStream = null;
+            }
+            catch(Exception e) {
+                e.printStackTrace();
+                LOGGER.warn("There was an error adding the temporaryFile to S3");
+            }
+            finally {
+                // we are done here.
+                this.isClosed = true;
+            }
+        }
+    }
+
+    private void addFile() throws Exception {
+
+        InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray());
+        int contentLength = outputStream.size();
+
+        TransferManager transferManager = new TransferManager(amazonS3Client);
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate());
+        metadata.setContentLength(contentLength);
+
+        metadata.addUserMetadata("writer", "org.apache.streams");
+
+        for(String s : metaData.keySet())
+            metadata.addUserMetadata(s, metaData.get(s));
+
+        String fileNameToWrite = path + fileName;
+        Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata);
+        try {
+            upload.waitForUploadResult();
+
+            is.close();
+            transferManager.shutdownNow(false);
+            LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName);
+        } catch (Exception e) {
+            // No Op
+        }
+
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
new file mode 100644
index 0000000..a987a47
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -0,0 +1,141 @@
+package org.apache.streams.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.*;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
+    public final static String STREAMS_ID = "S3PersistReader";
+    protected final static char DELIMITER = '\t';
+
+    private S3ReaderConfiguration s3ReaderConfiguration;
+    private AmazonS3Client amazonS3Client;
+    private ObjectMapper mapper = new ObjectMapper();
+    private Collection<String> files;
+    private ExecutorService executor;
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
+    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client; }
+    public S3ReaderConfiguration getS3ReaderConfiguration()             { return this.s3ReaderConfiguration; }
+    public String getBucketName()                                       { return this.s3ReaderConfiguration.getBucket(); }
+    public StreamsResultSet readNew(BigInteger sequence)                { return null; }
+    public StreamsResultSet readRange(DateTime start, DateTime end)     { return null; }
+    public DatumStatusCounter getDatumStatusCounter()                   { return countersTotal; }
+    public Collection<String> getFiles()                                { return this.files; }
+
+    public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
+        this.s3ReaderConfiguration = s3ReaderConfiguration;
+    }
+
+    public void prepare(Object configurationObject) {
+        // Connect to S3
+        synchronized (this)
+        {
+            // Create the credentials Object
+            AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
+
+            ClientConfiguration clientConfig = new ClientConfiguration();
+            clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase()));
+
+            // We want path style access
+            S3ClientOptions clientOptions = new S3ClientOptions();
+            clientOptions.setPathStyleAccess(true);
+
+            this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+            this.amazonS3Client.setS3ClientOptions(clientOptions);
+        }
+
+        final ListObjectsRequest request = new ListObjectsRequest()
+                .withBucketName(this.s3ReaderConfiguration.getBucket())
+                .withPrefix(s3ReaderConfiguration.getReaderPath())
+                .withMaxKeys(50);
+
+
+        ObjectListing listing = this.amazonS3Client.listObjects(request);
+
+        this.files = new ArrayList<String>();
+
+        /**
+         * If you can list files that are in this path, then you must be dealing with a directory
+         * if you cannot list files that are in this path, then you are most likely dealing with
+         * a simple file.
+         */
+        if(listing.getCommonPrefixes().size() > 0) {
+            // Handle the 'directory' use case
+            do
+            {
+                for (String file : listing.getCommonPrefixes())
+                    this.files.add(file);
+
+                // get the next batch.
+                listing = this.amazonS3Client.listNextBatchOfObjects(listing);
+            } while (listing.isTruncated());
+        }
+        else {
+            // handle the single file use-case
+            this.files.add(s3ReaderConfiguration.getReaderPath());
+        }
+
+        if(this.files.size() <= 0)
+            LOGGER.error("There are no files to read");
+
+        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+        this.executor = Executors.newSingleThreadExecutor();
+    }
+
+    public void cleanUp() { }
+
+    public StreamsResultSet readAll() {
+        startStream();
+        return new StreamsResultSet(persistQueue);
+    }
+
+    public void startStream() {
+        LOGGER.debug("startStream");
+        executor.submit(new S3PersistReaderTask(this));
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        synchronized( S3PersistReader.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            persistQueue.clear();
+        }
+        return current;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
new file mode 100644
index 0000000..70015fb
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -0,0 +1,87 @@
+package org.apache.streams.s3;
+
+import com.google.common.base.Strings;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.InputStreamReader;
+
+public class S3PersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class);
+
+    private S3PersistReader reader;
+
+    public S3PersistReaderTask(S3PersistReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+
+        for(String file : reader.getFiles())
+        {
+            // Create our buffered reader
+
+            S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
+            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
+            LOGGER.info("Reading: {} ", file);
+
+            String line = "";
+            try {
+                while((line = bufferedReader.readLine()) != null)
+                {
+                    if( !Strings.isNullOrEmpty(line) )
+                    {
+                        reader.countersCurrent.incrementAttempt();
+                        String[] fields = line.split(Character.toString(reader.DELIMITER));
+                        StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+                        write( entry );
+                        reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+                reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+            }
+
+            LOGGER.info("Completed:  " + file);
+
+            try {
+                closeSafely(file, is);
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOGGER.error(e.getMessage());
+            }
+        }
+    }
+
+    private static void closeSafely(String file, Closeable closeable) {
+        try {
+            closeable.close();
+        }
+        catch(Exception e) {
+            LOGGER.error("There was an issue closing file: {}", file);
+        }
+    }
+
+
+    private void write( StreamsDatum entry ) {
+        boolean success;
+        do {
+            synchronized( S3PersistReader.class ) {
+                success = reader.persistQueue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
new file mode 100644
index 0000000..c46ff03
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -0,0 +1,257 @@
+package org.apache.streams.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.streams.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable
+{
+    public final static String STREAMS_ID = "S3PersistWriter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
+
+    private final static char DELIMITER = '\t';
+
+    private ObjectMapper objectMapper;
+    private AmazonS3Client amazonS3Client;
+    private S3WriterConfiguration s3WriterConfiguration;
+    private final List<String> writtenFiles = new ArrayList<String>();
+
+    private final AtomicLong totalBytesWritten = new AtomicLong();
+    private AtomicLong bytesWrittenThisFile = new AtomicLong();
+
+    private final AtomicInteger totalRecordsWritten = new AtomicInteger();
+    private AtomicInteger fileLineCounter = new AtomicInteger();
+
+    private Map<String, String> objectMetaData = new HashMap<String, String>() {{
+        put("line[0]", "id");
+        put("line[1]", "timeStamp");
+        put("line[2]", "metaData");
+        put("line[3]", "document");
+    }};
+
+    private OutputStreamWriter currentWriter = null;
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client; }
+    public S3WriterConfiguration getS3WriterConfiguration()             { return this.s3WriterConfiguration; }
+    public List<String> getWrittenFiles()                               { return this.writtenFiles; }
+    public Map<String, String> getObjectMetaData()                      { return this.objectMetaData; }
+    public ObjectMapper getObjectMapper()                               { return this.objectMapper; }
+
+    public void setObjectMapper(ObjectMapper mapper)                    { this.objectMapper = mapper; }
+    public void setObjectMetaData(Map<String, String> val)              { this.objectMetaData = val; }
+
+    /**
+     * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
+     * @param amazonS3Client
+     * If you have an existing amazonS3Client, it wont' bother to create another one
+     * @param s3WriterConfiguration
+     * Configuration of the write paths and instructions are still required.
+     */
+    public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
+        this.amazonS3Client = amazonS3Client;
+        this.s3WriterConfiguration = s3WriterConfiguration;
+    }
+
+    public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
+        this.s3WriterConfiguration = s3WriterConfiguration;
+    }
+
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+        synchronized (this)
+        {
+            // Check to see if we need to reset the file that we are currently working with
+            if (this.currentWriter == null || ( this.bytesWrittenThisFile.get()  >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
+                try {
+                    LOGGER.info("Resetting the file");
+                    this.currentWriter = resetFile();
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            String line = convertResultToString(streamsDatum);
+
+            try {
+                this.currentWriter.write(line);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            // add the bytes we've written
+            int recordSize = line.getBytes().length;
+            this.totalBytesWritten.addAndGet(recordSize);
+            this.bytesWrittenThisFile.addAndGet(recordSize);
+
+            // increment the record count
+            this.totalRecordsWritten.incrementAndGet();
+            this.fileLineCounter.incrementAndGet();
+        }
+
+    }
+
+    private synchronized OutputStreamWriter resetFile() throws Exception
+    {
+        // this will keep it thread safe, so we don't create too many files
+        if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
+            return this.currentWriter;
+
+        closeAndDestroyWriter();
+
+        // Create the path for where the file is going to live.
+        try
+        {
+            // generate a file name
+            String fileName = this.s3WriterConfiguration.getWriterFilePrefix() +
+                    (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv";
+
+            // create the output stream
+            OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client,
+                    this.s3WriterConfiguration.getBucket(),
+                    this.s3WriterConfiguration.getWriterPath(),
+                    fileName,
+                    this.objectMetaData);
+
+            // reset the counter
+            this.fileLineCounter = new AtomicInteger();
+            this.bytesWrittenThisFile = new AtomicLong();
+
+            // add this to the list of written files
+            writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
+
+            // Log that we are creating this file
+            LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName);
+
+            // return the output stream
+            return new OutputStreamWriter(outputStream);
+        }
+        catch (Exception e)
+        {
+            LOGGER.error(e.getMessage());
+            throw e;
+        }
+    }
+
+    private synchronized void closeAndDestroyWriter() {
+        // if there is a current writer, we must close it first.
+        if (this.currentWriter != null) {
+            this.safeFlush(this.currentWriter);
+            this.closeSafely(this.currentWriter);
+            this.currentWriter = null;
+
+            //
+            LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
+        }
+    }
+
+    private synchronized void closeSafely(Writer writer)  {
+        if(writer != null) {
+            try {
+                writer.flush();
+                writer.close();
+            }
+            catch(Exception e) {
+                // noOp
+            }
+            LOGGER.debug("File Closed");
+        }
+    }
+
+    private void safeFlush(Flushable flushable) {
+        // This is wrapped with a ByteArrayOutputStream, so this is really safe.
+        if(flushable != null) {
+            try {
+                flushable.flush();
+            }
+            catch(IOException e) {
+                // noOp
+            }
+        }
+    }
+
+
+    private String convertResultToString(StreamsDatum entry)
+    {
+        String metadata = null;
+
+        try {
+            metadata = objectMapper.writeValueAsString(entry.getMetadata());
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+
+        String documentJson = null;
+        try {
+            documentJson = objectMapper.writeValueAsString(entry.getDocument());
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+
+        // Save the class name that it came from
+        entry.metadata.put("class", entry.getDocument().getClass().getName());
+
+        if(Strings.isNullOrEmpty(documentJson))
+            return null;
+        else
+            return  entry.getId()           + DELIMITER +   // [0] = Unique id of the verbatim
+                    entry.getTimestamp()    + DELIMITER +   // [1] = Timestamp of the item
+                    metadata                + DELIMITER +   // [2] = Metadata of the item
+                    documentJson            + "\n";         // [3] = The actual object
+    }
+
+    public void prepare(Object configurationObject) {
+        // Connect to S3
+        synchronized (this) {
+
+            // if the user has chosen to not set the object mapper, then set a default object mapper for them.
+            if(this.objectMapper == null)
+                this.objectMapper = new ObjectMapper();
+
+            // Create the credentials Object
+            if(this.amazonS3Client == null)
+            {
+                AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
+
+                ClientConfiguration clientConfig = new ClientConfiguration();
+                clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase()));
+
+                // We want path style access
+                S3ClientOptions clientOptions = new S3ClientOptions();
+                clientOptions.setPathStyleAccess(true);
+
+                this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+                this.amazonS3Client.setS3ClientOptions(clientOptions);
+            }
+        }
+    }
+
+    public void cleanUp() {
+        closeAndDestroyWriter();
+    }
+
+    public DatumStatusCounter getDatumStatusCounter() {
+        DatumStatusCounter counters = new DatumStatusCounter();
+        counters.incrementAttempt(this.totalRecordsWritten.get());
+        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
+        return counters;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
new file mode 100644
index 0000000..d791c87
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java
@@ -0,0 +1,37 @@
+package org.apache.streams.s3;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class S3PersistWriterTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriterTask.class);
+
+    private S3PersistWriter writer;
+
+    public S3PersistWriterTask(S3PersistWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void run() {
+        while(true) {
+            if( writer.persistQueue.peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
new file mode 100644
index 0000000..863668f
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -0,0 +1,25 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.s3.S3Configuration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "key": {
+            "type": "string",
+            "description": "Your Amazon Key"
+        },
+        "secretKey": {
+            "type": "string",
+            "description": "Your Amazon Secret Key"
+        },
+        "bucket": {
+            "type": "string",
+            "description": "The AWS bucket you want to write to"
+        },
+        "protocol": {
+            "type": "string",
+            "description": "Whether you are using HTTP or HTTPS"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json
new file mode 100644
index 0000000..2959b3d
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json
@@ -0,0 +1,14 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.s3.S3ReaderConfiguration",
+    "extends": {"$ref":"S3Configuration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "readerPath": {
+            "type": "string",
+            "description": "Path below root path"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
new file mode 100644
index 0000000..f43087b
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
@@ -0,0 +1,28 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.s3.S3WriterConfiguration",
+    "extends": {"$ref":"S3Configuration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "writerPath": {
+            "type": "string",
+            "description": "Path "
+        },
+        "writerFilePrefix": {
+            "type": "string",
+            "description": "File Prefix"
+        },
+        "maxFileSize": {
+            "type": "integer",
+            "default" : 20,
+            "description": "If files are elected to be 'chunked' which they are by default, this is the maximum size of that file before the byte array stream is vacated and the file is created."
+        },
+        "chunk": {
+            "type": "boolean",
+            "default" : true,
+            "description": "Whether you want the file chunked inside of a folder or not"
+        }
+    }
+}
\ No newline at end of file


[6/8] git commit: That was a debugging statement, pardon.

Posted by sb...@apache.org.
That was a debugging statement, pardon.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/361d122a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/361d122a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/361d122a

Branch: refs/heads/master
Commit: 361d122a5a8c93064e10f8c1817a8268d0bea3ea
Parents: 1a03d5f
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:22:20 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:22:20 2014 -0500

----------------------------------------------------------------------
 .../provider/TwitterStreamConfigurator.java      | 19 +------------------
 1 file changed, 1 insertion(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/361d122a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 5435f24..7c7ef1b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -9,7 +9,6 @@ import org.apache.streams.twitter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -76,23 +75,7 @@ public class TwitterStreamConfigurator {
     }
 
     public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) {
-
-        TwitterUserInformationConfiguration twitterUserInformationConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class);
-
-        try {
-            if(config.hasPath("info"))
-            {
-                List<String> info = new ArrayList<String>();
-
-                for (String s : config.getStringList("info"))
-                    info.add(s);
-            }
-        }
-        catch(Exception e) {
-            LOGGER.error("There was an error: {}", e.getMessage());
-        }
-
-        return twitterUserInformationConfiguration;
+        return mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class);
     }
 
 }


[3/8] git commit: https://issues.apache.org/jira/browse/STREAMS-73

Posted by sb...@apache.org.
https://issues.apache.org/jira/browse/STREAMS-73


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/651be792
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/651be792
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/651be792

Branch: refs/heads/master
Commit: 651be7921193eede7218fee62232ac3faf5abfaa
Parents: 544a0c9
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri May 2 13:35:39 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri May 2 13:35:39 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/LocalStreamProcessMonitorThread.java | 8 +++++++-
 .../streams/local/tasks/StatusCounterMonitorRunnable.java    | 6 ++++++
 .../streams/local/tasks/StatusCounterMonitorThread.java      | 8 +++++++-
 3 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/651be792/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
index 0b254b6..c1827df 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
@@ -7,7 +7,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.util.concurrent.Executor;
 
-public class LocalStreamProcessMonitorThread implements Runnable
+public class LocalStreamProcessMonitorThread implements StatusCounterMonitorRunnable
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class);
 
@@ -22,11 +22,17 @@ public class LocalStreamProcessMonitorThread implements Runnable
         this.seconds = delayInSeconds;
     }
 
+    @Override
     public void shutdown(){
         this.run = false;
     }
 
     @Override
+    public boolean isRunning() {
+        return this.run;
+    }
+
+    @Override
     public void run()
     {
         while(run){

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/651be792/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
new file mode 100644
index 0000000..ee6e102
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
@@ -0,0 +1,6 @@
+package org.apache.streams.local.tasks;
+
+public interface StatusCounterMonitorRunnable extends Runnable {
+    void shutdown();
+    boolean isRunning();
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/651be792/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
index c6febbe..7579209 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
@@ -4,7 +4,7 @@ import org.apache.streams.core.DatumStatusCountable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StatusCounterMonitorThread implements Runnable
+public class StatusCounterMonitorThread implements StatusCounterMonitorRunnable
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class);
 
@@ -19,11 +19,17 @@ public class StatusCounterMonitorThread implements Runnable
         this.seconds = delayInSeconds;
     }
 
+    @Override
     public void shutdown(){
         this.run = false;
     }
 
     @Override
+    public boolean isRunning() {
+        return this.run;
+    }
+
+    @Override
     public void run()
     {
         while(run){