You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/02/24 17:33:35 UTC

svn commit: r1571339 - in /incubator/streams/trunk: ./ streams-config-graph/ streams-config/ streams-contrib/ streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ streams-contrib/streams-persist-elasticsearch/ streams-contr...

Author: mfranklin
Date: Mon Feb 24 16:33:34 2014
New Revision: 1571339

URL: http://svn.apache.org/r1571339
Log:
Reverting changes to 1569602

Added:
    incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
      - copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
    incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
      - copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
    incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter (2).iml
      - copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter (2).iml
    incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter.iml
      - copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter.iml
Removed:
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java
Modified:
    incubator/streams/trunk/pom.xml
    incubator/streams/trunk/streams-config-graph/pom.xml
    incubator/streams/trunk/streams-config/pom.xml
    incubator/streams/trunk/streams-contrib/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
    incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
    incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
    incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
    incubator/streams/trunk/streams-core/pom.xml
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java

Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Mon Feb 24 16:33:34 2014
@@ -74,7 +74,7 @@
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
         <typesafe.config.version>1.2.0</typesafe.config.version>
-        <guava.version>16.0.1</guava.version>
+        <guava.version>15.0</guava.version>
         <scala.version>2.8.0</scala.version>
         <clojure.version>1.4.0</clojure.version>
         <storm.version>0.9.0.1</storm.version>
@@ -94,7 +94,7 @@
         <module>streams-config</module>
         <module>streams-config-graph</module>
         <module>streams-pojo</module>
-        <!--<module>streams-storm</module>-->
+        <module>streams-storm</module>
         <module>streams-util</module>
         <module>streams-pojo-extensions</module>
         <module>streams-contrib</module>

Modified: incubator/streams/trunk/streams-config-graph/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config-graph/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config-graph/pom.xml (original)
+++ incubator/streams/trunk/streams-config-graph/pom.xml Mon Feb 24 16:33:34 2014
@@ -37,8 +37,9 @@
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>com.google.guava</groupId>
+        <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
+            <version>15.0</version>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>

Modified: incubator/streams/trunk/streams-config/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config/pom.xml (original)
+++ incubator/streams/trunk/streams-config/pom.xml Mon Feb 24 16:33:34 2014
@@ -34,10 +34,22 @@
         <dependency>
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
+            <version>1.2.0</version>
         </dependency>
+        <!--<dependency>-->
+            <!--<groupId>commons-configuration</groupId>-->
+            <!--<artifactId>commons-configuration</artifactId>-->
+            <!--<version>1.10</version>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
+            <version>15.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.collections</groupId>
+            <artifactId>google-collections</artifactId>
+            <version>1.0</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>

Modified: incubator/streams/trunk/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/pom.xml Mon Feb 24 16:33:34 2014
@@ -38,17 +38,15 @@
     <modules>
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
-        <!--<module>streams-persist-hbase</module>-->
-        <!--<module>streams-persist-hdfs</module>-->
-        <!--<module>streams-persist-kafka</module>-->
-        <!--<module>streams-persist-mongo</module>-->
-        <!--<module>streams-provider-datasift</module>-->
-        <!--<module>streams-provider-facebook</module>-->
-        <!--<module>streams-provider-gnip</module>-->
-        <!--<module>streams-provider-moreover</module>-->
+        <module>streams-persist-hdfs</module>
+        <module>streams-persist-kafka</module>
+        <module>streams-provider-datasift</module>
+        <module>streams-provider-facebook</module>
+        <module>streams-provider-gnip</module>
+        <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>
-        <!--<module>streams-provider-sysomos</module>-->
-        <!--<module>streams-provider-rss</module>-->
+        <module>streams-provider-sysomos</module>
+        <module>streams-provider-rss</module>
         <!--<module>streams-proxy-semantria</module>-->
     </modules>
 

Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Mon Feb 24 16:33:34 2014
@@ -5,31 +5,42 @@ import com.fasterxml.jackson.databind.Ob
 import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.tasks.StreamsPersistWriterTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class ConsolePersistWriter extends StreamsPersistWriterTask implements StreamsPersistWriter  {
+public class ConsolePersistWriter implements StreamsPersistWriter {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
+    protected volatile Queue<StreamsDatum> persistQueue;
+
     private ObjectMapper mapper = new ObjectMapper();
 
-    public ConsolePersistWriter(StreamsPersistWriter writer) {
-        super(writer);
+    public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
     }
 
     @Override
-    public void prepare(Object o) {
-        Preconditions.checkNotNull(this.getInputQueues());
+    public void start() {
+        Preconditions.checkNotNull(persistQueue);
+        new Thread(new ConsolePersistWriterTask(this)).start();
     }
 
     @Override
-    public void cleanUp() {
+    public void stop() {
+
+    }
 
+    @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return this.persistQueue;
     }
 
     @Override

Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml Mon Feb 24 16:33:34 2014
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>0.90.5</version>
+            <version>1.0.0.RC2</version>
             <scope>compile</scope>
             <type>jar</type>
         </dependency>

Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java Mon Feb 24 16:33:34 2014
@@ -1,14 +1,10 @@
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
@@ -24,7 +20,6 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.math.BigInteger;
 import java.util.*;
 import java.util.concurrent.*;
@@ -35,7 +30,7 @@ import java.util.concurrent.*;
  * steveblackmon
  **************************************************************************************************************/
 
-public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>
+public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>, Runnable
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
 
@@ -58,10 +53,6 @@ public class ElasticsearchPersistReader 
     private int batchSize = 100;
     private String scrollTimeout = null;
 
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private ElasticsearchConfiguration config;
-
     private QueryBuilder queryBuilder;
     private FilterBuilder filterBuilder;
 
@@ -93,10 +84,16 @@ public class ElasticsearchPersistReader 
     public void setQueryBuilder(QueryBuilder queryBuilder)      { this.queryBuilder = queryBuilder; }
     public void setFilterBuilder(FilterBuilder filterBuilder)   { this.filterBuilder = filterBuilder; }
 
-    public ElasticsearchPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+    ListenableFuture providerTaskComplete;
+
+    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
     }
+
     public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration) {
         this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
     }
@@ -120,59 +117,6 @@ public class ElasticsearchPersistReader 
         this.withoutfields = withoutfields;
     }
 
-    @Override
-    public void prepare(Object o) {
-
-        // If we haven't already set up the search, then set up the search.
-        if(search == null)
-        {
-            search = elasticsearchClientManager.getClient()
-                    .prepareSearch(indexes)
-                    .setSearchType(SearchType.SCAN)
-                    .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
-                    .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
-
-            if(this.queryBuilder != null)
-                search.setQuery(this.queryBuilder);
-
-            // If the types are null, then don't specify a type
-            if(this.types != null && this.types.length > 0)
-                search = search.setTypes(types);
-
-            Integer clauses = 0;
-            if(this.withfields != null || this.withoutfields != null) {
-                if( this.withfields != null )
-                    clauses += this.withfields.length;
-                if( this.withoutfields != null )
-                    clauses += this.withoutfields.length;
-            }
-
-            List<FilterBuilder> filterList = buildFilterList();
-
-            FilterBuilder allFilters = andFilters(filterList);
-
-            if( clauses > 0 ) {
-                //    search.setPostFilter(allFilters);
-                search.setFilter(allFilters);
-            }
-
-            // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
-            if(this.random)
-                search = search.addSort(SortBuilders.scriptSort("random()", "number"));
-        }
-
-        // We don't have a scroll, we need to create a scroll
-        if(scrollResp == null) {
-            scrollResp = search.execute().actionGet();
-            LOGGER.trace(search.toString());
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-        LOGGER.info("PersistReader done");
-    }
-
     public void setWithfields(String[] withfields) {
         this.withfields = withfields;
     }
@@ -191,6 +135,49 @@ public class ElasticsearchPersistReader 
     {
         try
         {
+            // If we haven't already set up the search, then set up the search.
+            if(search == null)
+            {
+                search = elasticsearchClientManager.getClient()
+                        .prepareSearch(indexes)
+                        .setSearchType(SearchType.SCAN)
+                        .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
+                        .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+
+                if(this.queryBuilder != null)
+                    search.setQuery(this.queryBuilder);
+
+                // If the types are null, then don't specify a type
+                if(this.types != null && this.types.length > 0)
+                    search = search.setTypes(types);
+
+                Integer clauses = 0;
+                if(this.withfields != null || this.withoutfields != null) {
+                    if( this.withfields != null )
+                        clauses += this.withfields.length;
+                    if( this.withoutfields != null )
+                        clauses += this.withoutfields.length;
+                }
+
+                List<FilterBuilder> filterList = buildFilterList();
+
+                FilterBuilder allFilters = andFilters(filterList);
+
+                if( clauses > 0 ) {
+                    search.setPostFilter(allFilters);
+                }
+
+                // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
+                if(this.random)
+                    search = search.addSort(SortBuilders.scriptSort("random()", "number"));
+            }
+
+            // We don't have a scroll, we need to create a scroll
+            if(scrollResp == null) {
+                scrollResp = search.execute().actionGet();
+                LOGGER.trace(search.toString());
+            }
+
             // We have exhausted our scroll create another scroll.
             if(scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length)
             {
@@ -293,25 +280,44 @@ public class ElasticsearchPersistReader 
     }
 
     @Override
-    public StreamsResultSet readCurrent() {
+    public void run() {
 
-        Queue<StreamsDatum> currentQueue = new LinkedBlockingQueue<StreamsDatum>();
-        while( hasNext()) {
-            SearchHit hit = next();
-            ObjectNode jsonObject = null;
+        providerTaskComplete = executor.submit((new Thread(new ElasticsearchPersistReaderTask(this))));
+
+        while( !providerTaskComplete.isDone()) {
             try {
-                jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-                break;
-            }
-            StreamsDatum item = new StreamsDatum(jsonObject);
-            item.getMetadata().put("id", hit.getId());
-            item.getMetadata().put("index", hit.getIndex());
-            item.getMetadata().put("type", hit.getType());
-            currentQueue.add(item);
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) { }
         }
-        return (StreamsResultSet)currentQueue;
+
+        stop();
+    }
+
+    @Override
+    public void start() {
+
+
+    }
+
+    @Override
+    public void stop() {
+        shutdownAndAwaitTermination(executor);
+        LOGGER.info("PersistReader done");
+    }
+
+    @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return this.persistQueue;
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return null;
     }
 
     @Override
@@ -324,6 +330,21 @@ public class ElasticsearchPersistReader 
         return null;
     }
 
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
 }
-
-

Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Mon Feb 24 16:33:34 2014
@@ -9,13 +9,11 @@ import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.tasks.StreamsPersistWriterTask;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -38,7 +36,7 @@ import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
     private final static NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
@@ -48,13 +46,12 @@ public class ElasticsearchPersistWriter 
     private Client client;
     private String parentID = null;
     private BulkRequestBuilder bulkRequest;
-    private OutputStreamWriter currentWriter = null;
 
     private String index = null;
     private String type = null;
     private int batchSize = 50;
     private int totalRecordsWritten = 0;
-    private boolean veryLargeBulk = false;  // by default this setting is set to false
+    private OutputStreamWriter currentWriter = null;
 
     private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
     private static final long WAITING_DOCS_LIMIT = 10000;
@@ -71,22 +68,7 @@ public class ElasticsearchPersistWriter 
     private volatile long batchSizeInBytes = 0;
     private volatile int batchItemsSent = 0;
 
-    public void setIndex(String index) {
-        this.index = index;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
-
+    private boolean veryLargeBulk = false;  // by default this setting is set to false
     private final List<String> affectedIndexes = new ArrayList<String>();
 
     public int getTotalOutstanding()                           { return this.totalSent - (this.totalFailed + this.totalOk); }
@@ -114,6 +96,44 @@ public class ElasticsearchPersistWriter 
     public ElasticsearchPersistWriter() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
         this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchPersistWriter(Queue<StreamsDatum> persistQueue) {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.persistQueue = persistQueue;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+        this.config = config;
+        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index, String type) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+        this.type = type;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index, String type, boolean veryLargeBulk) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+        this.type = type;
+        this.veryLargeBulk = veryLargeBulk;
     }
 
     private static final int  BYTES_IN_MB = 1024*1024;
@@ -124,15 +144,33 @@ public class ElasticsearchPersistWriter 
     public boolean isConnected() 		                { return (client != null); }
 
     @Override
-    public void prepare(Object o) {
+    public void write(StreamsDatum streamsDatum) {
+
+        String json;
+        try {
+
+            json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+            add(index, type, null, json);
+
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+        }
+    }
+
+    @Override
+    public void start() {
+
         manager = new ElasticsearchClientManager(config);
         client = manager.getClient();
 
         LOGGER.info(client.toString());
+
     }
 
     @Override
-    public void cleanUp() {
+    public void stop() {
 
         try {
             flush();
@@ -143,19 +181,34 @@ public class ElasticsearchPersistWriter 
     }
 
     @Override
-    public void write(StreamsDatum streamsDatum) {
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
 
-        String json;
-        try {
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
 
-            json = mapper.writeValueAsString(streamsDatum.getDocument());
 
-            add(index, type, null, json);
+    @Override
+    public void run() {
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+        start();
+
+        task = new Thread(new ElasticsearchPersistWriterTask(this));
+
+        task.start();
 
+        try {
+            task.join(60000);
+        } catch (InterruptedException e) {
+            stop();
+            return;
         }
+        stop();
+        return;
+
     }
 
     @Override
@@ -526,5 +579,4 @@ public class ElasticsearchPersistWriter 
 
         return toReturn;
     }
-
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml Mon Feb 24 16:33:34 2014
@@ -1,20 +1,25 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
-    <output url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes" />
-    <output-test url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes" />
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
     <content url="file://$MODULE_DIR$">
       <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jaxb2" isTestSource="false" generated="true" />
-      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
       <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
       <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
       <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
       <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
-      <excludeFolder url="file://$MODULE_DIR$/target" />
+      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
+      <excludeFolder url="file://$MODULE_DIR$/target/classes" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
+      <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
+      <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
     </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
-    <orderEntry type="library" name="Maven: com.typesafe:config:1.2.0" level="project" />
+    <orderEntry type="library" name="Maven: com.typesafe:config:1.0.2" level="project" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
@@ -27,13 +32,13 @@
     <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
     <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
     <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
-    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.6.1" level="project" />
     <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
     <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.streams:streams-core:0.1-SNAPSHOT" level="project" />
+    <orderEntry type="module" module-name="streams-core (2)" />
     <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
     <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.streams:streams-pojo:0.1-SNAPSHOT" level="project" />
+    <orderEntry type="module" module-name="streams-pojo (2)" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
@@ -49,8 +54,8 @@
     <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
     <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
     <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.streams:streams-config:0.1-SNAPSHOT" level="project" />
-    <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+    <orderEntry type="module" module-name="streams-config (2)" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
     <orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" />

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Mon Feb 24 16:33:34 2014
@@ -123,12 +123,9 @@ public class TwitterEventProcessor imple
                 LOGGER.debug("DELETE");
                 result = mapper.convertValue(event, Delete.class);
             }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
         }
 
-            // no supported conversion were applied
+        // no supported conversion were applied
         if( result != null )
             return result;
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Mon Feb 24 16:33:34 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterStreamProvider implements StreamsProvider, Serializable {
+public class TwitterStreamProvider implements StreamsProvider, Serializable, Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
@@ -77,44 +77,15 @@ public class TwitterStreamProvider imple
         Config config = StreamsConfigurator.config.getConfig("twitter");
         this.config = TwitterStreamConfigurator.detectConfiguration(config);
         this.klass = klass;
-        providerQueue = new LinkedBlockingQueue<StreamsDatum>();
     }
 
     public TwitterStreamProvider(TwitterStreamConfiguration config, Class klass) {
         this.config = config;
         this.klass = klass;
-        providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    }
-
-    public void run() {
-
-        for (int i = 0; i < 10; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
-
-        new Thread(new TwitterStreamProviderTask(this)).start();
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        run();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
     }
 
     @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public void prepare(Object o) {
+    public void start() {
 
         Preconditions.checkNotNull(this.klass);
 
@@ -150,12 +121,57 @@ public class TwitterStreamProvider imple
                 .authentication(auth)
                 .processor(new StringDelimitedProcessor(inQueue))
                 .build();
+
+        for (int i = 0; i < 10; i++) {
+            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
+        }
+
+        new Thread(new TwitterStreamProviderTask(this)).start();
     }
 
     @Override
-    public void cleanUp() {
+    public void stop() {
         for (int i = 0; i < 10; i++) {
             inQueue.add(TwitterEventProcessor.TERMINATE);
         }
     }
+
+    @Override
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    public void setProviderQueue(Queue<StreamsDatum> providerQueue) {
+        this.providerQueue = providerQueue;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public void run() {
+
+        start();
+
+        while( !executor.isTerminated()) {
+            try {
+                executor.awaitTermination(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) { }
+        }
+
+        stop();
+    }
+
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Mon Feb 24 16:33:34 2014
@@ -14,7 +14,6 @@ import org.apache.streams.twitter.Twitte
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
 import twitter4j.conf.ConfigurationBuilder;
@@ -29,7 +28,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
@@ -88,7 +87,8 @@ public class TwitterTimelineProvider imp
         this.klass = klass;
     }
 
-    public void run() {
+    @Override
+    public void start() {
 
         Preconditions.checkNotNull(providerQueue);
 
@@ -133,26 +133,51 @@ public class TwitterTimelineProvider imp
     }
 
     @Override
+    public void stop() {
+        for (int i = 0; i < 1; i++) {
+            inQueue.add(TwitterEventProcessor.TERMINATE);
+        }
+
+        shutdownAndAwaitTermination(executor);
+    }
+
+    @Override
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
     public StreamsResultSet readCurrent() {
-        run();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
+        return null;
     }
 
     @Override
     public StreamsResultSet readNew(BigInteger sequence) {
-        throw new NotImplementedException();
+        return null;
     }
 
     @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         this.start = start;
         this.end = end;
-        run();
+        start();
         StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
         return result;
     }
 
+    @Override
+    public void run() {
+
+        start();
+
+        while( !providerTaskComplete.isDone()) {
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) { }
+        }
+
+        stop();
+    }
     void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {
@@ -172,52 +197,4 @@ public class TwitterTimelineProvider imp
     }
 
 
-    @Override
-    public void prepare(Object o) {
-
-        Preconditions.checkNotNull(providerQueue);
-
-        Preconditions.checkNotNull(this.klass);
-
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
-        Preconditions.checkNotNull(config.getFollow());
-
-        Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
-        Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
-        Iterator<Long> ids = config.getFollow().iterator();
-        while( ids.hasNext() ) {
-            Long id = ids.next();
-
-            String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                    .setJSONStoreEnabled(jsonStoreEnabled)
-                    .setAsyncNumThreads(3)
-                    .setRestBaseURL(baseUrl);
-
-            Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-            providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
-        }
-
-        for (int i = 0; i < 1; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -107,6 +107,10 @@ public abstract class TwitterJsonEventAc
         return provider;
     }
 
+    public static List<Object> getLinks(ObjectNode event) {
+        return null;
+    }
+
     public static String getUrls(ObjectNode event) {
         return null;
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -2,18 +2,15 @@ package org.apache.streams.twitter.seria
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
@@ -47,8 +44,6 @@ public class TwitterJsonRetweetActivityS
         activity.setProvider(buildProvider(event));
         activity.setTitle("");
         activity.setContent(retweet.getRetweetedStatus().getText());
-        activity.setUrl(getUrls(event));
-        activity.setLinks(getLinks(retweet));
         addTwitterExtension(activity, event);
         addLocationExtension(activity, retweet);
         return activity;
@@ -73,14 +68,6 @@ public class TwitterJsonRetweetActivityS
         return actObj;
     }
 
-    public static List<Object> getLinks(Retweet retweet) {
-        List<Object> links = Lists.newArrayList();
-        for( Url url : retweet.getRetweetedStatus().getEntities().getUrls() ) {
-            links.add(url.getExpandedUrl());
-        }
-        return links;
-    }
-
     public static void addLocationExtension(Activity activity, Retweet retweet) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = new HashMap<String, Object>();

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -2,17 +2,14 @@ package org.apache.streams.twitter.seria
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
@@ -48,7 +45,7 @@ public class TwitterJsonTweetActivitySer
         activity.setTitle("");
         activity.setContent(tweet.getText());
         activity.setUrl(getUrls(event));
-        activity.setLinks(getLinks(tweet));
+        activity.setLinks(getLinks(event));
         addTwitterExtension(activity, event);
         addLocationExtension(activity, tweet);
         return activity;
@@ -73,14 +70,6 @@ public class TwitterJsonTweetActivitySer
         return actObj;
     }
 
-    public static List<Object> getLinks(Tweet tweet) {
-        List<Object> links = Lists.newArrayList();
-        for( Url url : tweet.getEntities().getUrls() ) {
-            links.add(url.getExpandedUrl());
-        }
-        return links;
-    }
-
     public static ActivityObject buildTarget(Tweet tweet) {
         return null;
     }

Modified: incubator/streams/trunk/streams-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/pom.xml (original)
+++ incubator/streams/trunk/streams-core/pom.xml Mon Feb 24 16:33:34 2014
@@ -26,6 +26,7 @@
         <version>0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
+
     <artifactId>streams-core</artifactId>
     <packaging>jar</packaging>
 
@@ -34,11 +35,6 @@
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-util</artifactId>
-            <version>${project.version}</version>
-        </dependency>
     </dependencies>
 
     <build>
@@ -55,4 +51,4 @@
             </testResource>
         </testResources>
     </build>
-</project>
+</project>
\ No newline at end of file

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Mon Feb 24 16:33:34 2014
@@ -95,21 +95,4 @@ public class StreamsDatum implements Ser
     public void setDocument(Object document) {
         this.document = document;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if(o instanceof StreamsDatum) {
-            StreamsDatum that = (StreamsDatum) o;
-            if(this.document != null && this.document.equals(that.document)) {
-                return (this.timestamp != null ? this.timestamp.equals(that.timestamp) : that.timestamp == null) &&
-                        (this.sequenceid != null ? this.sequenceid.equals(that.sequenceid) : that.sequenceid == null);
-            }
-            else {
-                return that.document == null && this.document == null;
-            }
-        }
-        else {
-            return false;
-        }
-    }
 }

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java Mon Feb 24 16:33:34 2014
@@ -25,22 +25,18 @@ import java.math.BigInteger;
 import java.util.Queue;
 
 /**
- *
- * Currently a duplicate interface.  Has exact same methods as StreamsProvider.
- * Future work should make this interface necessary I'm told.
- *
+ * Created by sblackmon on 12/13/13.
  */
-public interface StreamsPersistReader extends StreamsProvider {
+public interface StreamsPersistReader {
 
-//    void start();
-//    void stop();
-//
-//    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
-//    public Queue<StreamsDatum> getPersistQueue();
+    void start();
+    void stop();
 
-//    public StreamsResultSet readAll();
-//    public StreamsResultSet readNew(BigInteger sequence);
-//    public StreamsResultSet readRange(DateTime start, DateTime end);
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+    public Queue<StreamsDatum> getPersistQueue();
 
+    public StreamsResultSet readAll();
+    public StreamsResultSet readNew(BigInteger sequence);
+    public StreamsResultSet readRange(DateTime start, DateTime end);
 
 }

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java Mon Feb 24 16:33:34 2014
@@ -24,12 +24,14 @@ import java.util.Queue;
 /**
  * Created by sblackmon on 12/13/13.
  */
-public interface StreamsPersistWriter extends StreamsOperation{
+public interface StreamsPersistWriter {
+
+    void start();
+    void stop();
+
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+    public Queue<StreamsDatum> getPersistQueue();
 
-    /**
-     * Persist the StreamsDatum to the corresponding data store.
-     * @param entry to be stored.
-     */
     public void write( StreamsDatum entry );
 
 }

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Mon Feb 24 16:33:34 2014
@@ -24,16 +24,17 @@ import java.util.Queue;
 /**
  * Created by sblackmon on 12/13/13.
  */
-public interface StreamsProcessor extends StreamsOperation{
+public interface StreamsProcessor {
 
+    void start();
+    void stop();
 
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue);
+    public Queue<StreamsDatum> getProcessorInputQueue();
+
+    public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
+    public Queue<StreamsDatum> getProcessorOutputQueue();
 
-    /**
-     * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will
-     * passed to every down stream operation that reads from this processor.
-     * @param entry StreamsDatum to be process
-     * @return resulting StreamDatums from process. Should never be null or contain null object.  Empty list OK.
-     */
     public List<StreamsDatum> process( StreamsDatum entry );
 
 }

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Mon Feb 24 16:33:34 2014
@@ -26,12 +26,12 @@ import java.util.Queue;
 /**
  * Created by sblackmon on 12/13/13.
  */
-public interface StreamsProvider extends StreamsOperation {
+public interface StreamsProvider {
 
-//    void start();
-//    void stop();
-//
-//    public Queue<StreamsDatum> getProviderQueue();
+    void start();
+    void stop();
+
+    public Queue<StreamsDatum> getProviderQueue();
 
     public StreamsResultSet readCurrent();
     public StreamsResultSet readNew(BigInteger sequence);