You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:25 UTC

[38/52] [abbrv] git commit: Fixed memory leak in ES reader pipelines

Fixed memory leak in ES reader pipelines


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f877c5fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f877c5fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f877c5fa

Branch: refs/heads/sblackmon
Commit: f877c5fa46918f09a429139dbe5d2dfbf5eb4ea3
Parents: 79ac9aa
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Apr 11 07:17:13 2014 -0600
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Apr 11 07:17:13 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 streams-config/pom.xml                          |  2 +-
 streams-contrib/pom.xml                         |  2 +-
 .../streams-persist-cassandra/pom.xml           |  2 +-
 streams-contrib/streams-persist-console/pom.xml |  2 +-
 .../streams-persist-elasticsearch/pom.xml       |  2 +-
 .../ElasticsearchPersistReader.java             | 22 ++++++++++----------
 .../ElasticsearchPersistReaderTask.java         | 14 ++++++++++++-
 streams-contrib/streams-persist-hbase/pom.xml   |  2 +-
 streams-contrib/streams-persist-hdfs/pom.xml    |  2 +-
 streams-contrib/streams-persist-kafka/pom.xml   |  2 +-
 streams-contrib/streams-persist-mongo/pom.xml   |  2 +-
 streams-contrib/streams-processor-urls/pom.xml  |  4 ++--
 .../streams-provider-datasift/pom.xml           |  2 +-
 .../streams-provider-facebook/pom.xml           |  2 +-
 .../gnip-edc-facebook/pom.xml                   |  2 +-
 .../gnip-edc-flickr/pom.xml                     |  2 +-
 .../gnip-edc-googleplus/pom.xml                 |  2 +-
 .../gnip-edc-instagram/pom.xml                  |  2 +-
 .../gnip-edc-reddit/pom.xml                     |  2 +-
 .../gnip-edc-youtube/pom.xml                    |  2 +-
 .../gnip-powertrack/pom.xml                     |  2 +-
 streams-contrib/streams-provider-gnip/pom.xml   |  2 +-
 .../google-gmail/pom.xml                        |  2 +-
 .../google-gplus/pom.xml                        |  2 +-
 streams-contrib/streams-provider-google/pom.xml |  2 +-
 .../streams-provider-moreover/pom.xml           |  2 +-
 streams-contrib/streams-provider-rss/pom.xml    |  2 +-
 .../streams-provider-sysomos/pom.xml            |  2 +-
 .../streams-provider-twitter/pom.xml            |  2 +-
 streams-core/pom.xml                            |  2 +-
 .../activity-consumer/pom.xml                   |  2 +-
 .../activity-registration/pom.xml               |  2 +-
 .../activity-subscriber/pom.xml                 |  2 +-
 streams-osgi-components/pom.xml                 |  2 +-
 .../streams-components-all/pom.xml              |  2 +-
 streams-pojo/pom.xml                            |  2 +-
 streams-runtimes/pom.xml                        |  2 +-
 streams-runtimes/streams-runtime-local/pom.xml  |  2 +-
 .../streams/local/tasks/BaseStreamsTask.java    | 14 +++----------
 streams-runtimes/streams-runtime-pig/pom.xml    |  2 +-
 streams-runtimes/streams-runtime-storm/pom.xml  |  2 +-
 streams-runtimes/streams-runtime-webapp/pom.xml |  2 +-
 streams-util/pom.xml                            |  2 +-
 streams-web/pom.xml                             |  2 +-
 45 files changed, 70 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 069a100..78f5d6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <groupId>org.apache.streams</groupId>
     <artifactId>streams-project</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
 
     <name>Apache Streams Project</name>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-config/pom.xml
----------------------------------------------------------------------
diff --git a/streams-config/pom.xml b/streams-config/pom.xml
index 92dd7c8..f31301c 100644
--- a/streams-config/pom.xml
+++ b/streams-config/pom.xml
@@ -25,7 +25,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-project</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>streams-config</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 727fa28..d80fc63 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>streams-project</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
index 33967b0..fd6711f 100644
--- a/streams-contrib/streams-persist-cassandra/pom.xml
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -25,7 +25,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>streams-persist-cassandra</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-console/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/pom.xml b/streams-contrib/streams-persist-console/pom.xml
index d62da1c..c7f2cd3 100644
--- a/streams-contrib/streams-persist-console/pom.xml
+++ b/streams-contrib/streams-persist-console/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 6af675c..07433ba 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4fde58d..ea74bf6 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -11,9 +11,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -121,7 +119,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     public void prepare(Object o) {
 
         mapper = StreamsJacksonMapper.getInstance();
-        persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
 
         // If we haven't already set up the search, then set up the search.
         if(search == null)
@@ -290,14 +288,16 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     @Override
     public StreamsResultSet readCurrent() {
 
-        LOGGER.debug("readCurrent: {}", persistQueue.size());
+        StreamsResultSet current;
 
-        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
-        Iterators.addAll(currentIterator, persistQueue.iterator());
-
-        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
-
-        persistQueue.clear();
+        synchronized( ElasticsearchPersistReader.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            current.setCounter(new DatumStatusCounter());
+//            current.getCounter().add(countersCurrent);
+//            countersTotal.add(countersCurrent);
+//            countersCurrent = new DatumStatusCounter();
+            persistQueue.clear();
+        }
 
         return current;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
index 505dc01..7750fac 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
@@ -4,11 +4,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
 import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Queue;
 import java.util.Random;
 
 public class ElasticsearchPersistReaderTask implements Runnable {
@@ -40,7 +42,7 @@ public class ElasticsearchPersistReaderTask implements Runnable {
             item.getMetadata().put("id", hit.getId());
             item.getMetadata().put("index", hit.getIndex());
             item.getMetadata().put("type", hit.getType());
-            reader.persistQueue.offer(item);
+            write(item);
         }
         try {
             Thread.sleep(new Random().nextInt(100));
@@ -48,4 +50,14 @@ public class ElasticsearchPersistReaderTask implements Runnable {
 
     }
 
+    private void write( StreamsDatum entry ) {
+        boolean success;
+        do {
+            synchronized( ElasticsearchPersistReader.class ) {
+                success = reader.persistQueue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/pom.xml b/streams-contrib/streams-persist-hbase/pom.xml
index 04f8c39..18ec32e 100644
--- a/streams-contrib/streams-persist-hbase/pom.xml
+++ b/streams-contrib/streams-persist-hbase/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index a111f1a..5fe33b3 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index 6ee84bb..84ddee8 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/pom.xml b/streams-contrib/streams-persist-mongo/pom.xml
index ae0f91d..fbe1723 100644
--- a/streams-contrib/streams-persist-mongo/pom.xml
+++ b/streams-contrib/streams-persist-mongo/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-processor-urls/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml
index 3e820ce..b320ca5 100644
--- a/streams-contrib/streams-processor-urls/pom.xml
+++ b/streams-contrib/streams-processor-urls/pom.xml
@@ -5,12 +5,12 @@
 
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-processor-urls</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
 
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 0f249b5..6d59514 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml
index 6036690..0d54255 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-contrib</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-provider-facebook</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml
index 8c0cab2..e0603bd 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-provider-gnip</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml
index 25b77f7..e96b90c 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-provider-gnip</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml
index 1d760e7..03f74c1 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-gnip</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml
index acdb119..eced509 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-gnip</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml
index 8eb1b92..dea0089 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-gnip</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml
index 14b5a31..b9e6e1e 100644
--- a/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streams-provider-gnip</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml b/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml
index ad17ae7..cbe47f5 100644
--- a/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml
+++ b/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-gnip</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-gnip/pom.xml b/streams-contrib/streams-provider-gnip/pom.xml
index fd80329..68d6591 100644
--- a/streams-contrib/streams-provider-gnip/pom.xml
+++ b/streams-contrib/streams-provider-gnip/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/google-gmail/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/pom.xml b/streams-contrib/streams-provider-google/google-gmail/pom.xml
index 2d72ce0..4240572 100644
--- a/streams-contrib/streams-provider-google/google-gmail/pom.xml
+++ b/streams-contrib/streams-provider-google/google-gmail/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-google</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/google-gplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml
index ac10497..facb6ef 100644
--- a/streams-contrib/streams-provider-google/google-gplus/pom.xml
+++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-provider-google</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/pom.xml b/streams-contrib/streams-provider-google/pom.xml
index 00feb60..b720b76 100644
--- a/streams-contrib/streams-provider-google/pom.xml
+++ b/streams-contrib/streams-provider-google/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-moreover/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml
index 178d383..b3dcf8e 100644
--- a/streams-contrib/streams-provider-moreover/pom.xml
+++ b/streams-contrib/streams-provider-moreover/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-rss/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/pom.xml b/streams-contrib/streams-provider-rss/pom.xml
index ac258a9..c9f24d5 100644
--- a/streams-contrib/streams-provider-rss/pom.xml
+++ b/streams-contrib/streams-provider-rss/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-sysomos/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/pom.xml b/streams-contrib/streams-provider-sysomos/pom.xml
index 82534d7..9880457 100644
--- a/streams-contrib/streams-provider-sysomos/pom.xml
+++ b/streams-contrib/streams-provider-sysomos/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 8153270..3c27b8c 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-contrib</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index 4c81008..9546b5f 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>streams-project</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-consumer/pom.xml
----------------------------------------------------------------------
diff --git a/streams-osgi-components/activity-consumer/pom.xml b/streams-osgi-components/activity-consumer/pom.xml
index 6f6aa06..f4964d5 100644
--- a/streams-osgi-components/activity-consumer/pom.xml
+++ b/streams-osgi-components/activity-consumer/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.apache.streams.osgi.components</groupId>
     <artifactId>streams-osgi-components</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>activity-consumer</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-registration/pom.xml
----------------------------------------------------------------------
diff --git a/streams-osgi-components/activity-registration/pom.xml b/streams-osgi-components/activity-registration/pom.xml
index 46faabf..9ab8a74 100644
--- a/streams-osgi-components/activity-registration/pom.xml
+++ b/streams-osgi-components/activity-registration/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.apache.streams.osgi.components</groupId>
     <artifactId>streams-osgi-components</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>activity-registration</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-subscriber/pom.xml
----------------------------------------------------------------------
diff --git a/streams-osgi-components/activity-subscriber/pom.xml b/streams-osgi-components/activity-subscriber/pom.xml
index 4f8ee1a..367fee7 100644
--- a/streams-osgi-components/activity-subscriber/pom.xml
+++ b/streams-osgi-components/activity-subscriber/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.apache.streams.osgi.components</groupId>
     <artifactId>streams-osgi-components</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>activity-subscriber</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-osgi-components/pom.xml b/streams-osgi-components/pom.xml
index 078586b..d7f68cd 100644
--- a/streams-osgi-components/pom.xml
+++ b/streams-osgi-components/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.streams</groupId>
     <artifactId>streams-project</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <groupId>org.apache.streams.osgi.components</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/streams-components-all/pom.xml
----------------------------------------------------------------------
diff --git a/streams-osgi-components/streams-components-all/pom.xml b/streams-osgi-components/streams-components-all/pom.xml
index 2389cdf..448f1a4 100644
--- a/streams-osgi-components/streams-components-all/pom.xml
+++ b/streams-osgi-components/streams-components-all/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.apache.streams.osgi.components</groupId>
     <artifactId>streams-osgi-components</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>streams-components-all</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index 8a9f5c0..a3a12f6 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-project</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 47418f1..5d43c28 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -25,7 +25,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-project</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>streams-runtimes</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 2e9d6e3..b7ddb9a 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -25,7 +25,7 @@
     <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-runtimes</artifactId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>streams-runtime-local</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 8006560..a7f988e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +75,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
      */
     protected void addToOutgoingQueue(StreamsDatum datum) {
         if(this.outQueues.size() == 1) {
-            enqueue(outQueues.get(0), datum);
+            ComponentUtils.offerUntilSuccess(datum, outQueues.get(0));
         }
         else {
             StreamsDatum newDatum = null;
@@ -82,7 +83,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
                 try {
                     newDatum = cloneStreamsDatum(datum);
                     if(newDatum != null) {
-                        enqueue(queue, newDatum);
+                        ComponentUtils.offerUntilSuccess(newDatum, queue);
                     }
                 } catch (RuntimeException e) {
                     LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
@@ -145,15 +146,6 @@ public abstract class BaseStreamsTask implements StreamsTask {
         return this.inIndex;
     }
 
-    private void enqueue( Queue<StreamsDatum> queue, StreamsDatum entry ) {
-        boolean success;
-        do {
-            success = queue.offer(entry);
-            Thread.yield();
-        }
-        while( !success );
-    }
-
     private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) {
         Map<String, Object> fromMeta = copyFrom.getMetadata();
         Map<String, Object> toMeta = copyTo.getMetadata();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 6d91a2f..e62a7c8 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>streams-runtimes</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-runtime-pig</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml
index 921d346..edaa760 100644
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ b/streams-runtimes/streams-runtime-storm/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>streams-runtimes</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-runtime-storm</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/pom.xml b/streams-runtimes/streams-runtime-webapp/pom.xml
index c1d21b7..23b00f3 100644
--- a/streams-runtimes/streams-runtime-webapp/pom.xml
+++ b/streams-runtimes/streams-runtime-webapp/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.streams</groupId>
     <artifactId>streams-runtimes</artifactId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>streams-runtime-webapp</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-util/pom.xml
----------------------------------------------------------------------
diff --git a/streams-util/pom.xml b/streams-util/pom.xml
index 5249021..0a48ec9 100644
--- a/streams-util/pom.xml
+++ b/streams-util/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>streams-project</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.1-SPRINGCLEANING</version>
+        <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-web/pom.xml
----------------------------------------------------------------------
diff --git a/streams-web/pom.xml b/streams-web/pom.xml
index 612fe72..89ca1b5 100644
--- a/streams-web/pom.xml
+++ b/streams-web/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <artifactId>streams-project</artifactId>
     <groupId>org.apache.streams</groupId>
-    <version>0.1-SPRINGCLEANING</version>
+    <version>0.1-SNAPSHOT</version>
   </parent>
   <artifactId>streams-web</artifactId>
   <packaging>war</packaging>