You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/05 22:06:51 UTC

svn commit: r1574660 - in /incubator/streams/branches/STREAMS-26: ./ streams-contrib/ streams-contrib/streams-components-test/ streams-contrib/streams-components-test/src/ streams-contrib/streams-components-test/src/main/ streams-contrib/streams-compon...

Author: sblackmon
Date: Wed Mar  5 21:06:50 2014
New Revision: 1574660

URL: http://svn.apache.org/r1574660
Log:
merging ryan and steve latest

Added:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/streams/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/streams/test/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/resources/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json
    incubator/streams/branches/STREAMS-26/streams-core/src/test/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java
    incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java
Modified:
    incubator/streams/branches/STREAMS-26/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
    incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml

Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Wed Mar  5 21:06:50 2014
@@ -155,10 +155,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
-        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Wed Mar  5 21:06:50 2014
@@ -47,8 +47,9 @@
         <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>
         <!--<module>streams-provider-sysomos</module>-->
-        <module>streams-provider-rss</module>
+        <!--<module>streams-provider-rss</module>-->
         <!--<module>streams-proxy-semantria</module>-->
+        <module>streams-components-test</module>
     </modules>
 
     <dependencyManagement>

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml Wed Mar  5 21:06:50 2014
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-components-test</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+
+        <build>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>2.4</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>test-jar</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </build>
+
+
+</project>
\ No newline at end of file

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,54 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public class ExpectedDatumsPersistWriter implements StreamsPersistWriter{
+
+    private StreamsDatumConverter converter;
+    private String fileName;
+    private List<StreamsDatum> expectedDatums;
+    private int counted = 0;
+    private int expectedSize = 0;
+
+    public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
+        this.converter = converter;
+        this.fileName = filePathInResources;
+    }
+
+
+
+    @Override
+    public void write(StreamsDatum entry) {
+        int index = this.expectedDatums.indexOf(entry);
+        assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
+        this.expectedDatums.remove(index);
+        ++this.counted;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
+        this.expectedDatums = new LinkedList<StreamsDatum>();
+        while(scanner.hasNextLine()) {
+            this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
+        }
+        this.expectedSize = this.expectedDatums.size();
+    }
+
+    @Override
+    public void cleanUp() {
+        assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,98 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.sql.ResultSet;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Scanner;
+
+/**
+ * FOR TESTING PURPOSES ONLY.
+ *
+ * The StreamProvider reads from a File or InputStream.  Each line of the file will be emitted as the document of a
+ * streams datum.
+ *
+ */
+public class FileReaderProvider implements StreamsProvider {
+
+    private String fileName;
+    private InputStream inStream;
+    private Scanner scanner;
+    private StreamsDatumConverter converter;
+
+    public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
+        this.fileName = filePathInResources;
+        this.converter = converter;
+    }
+
+    @Override
+    public void startStream() {
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return new ResultSet();
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
+    }
+
+    @Override
+    public void cleanUp() {
+        this.scanner.close();
+    }
+
+    private class ResultSet extends StreamsResultSet {
+
+        public ResultSet() {
+            super(null);
+        }
+
+
+        @Override
+        public Iterator<StreamsDatum> iterator() {
+            return new FileProviderIterator();
+        }
+
+        private class FileProviderIterator implements Iterator<StreamsDatum> {
+
+
+
+            @Override
+            public boolean hasNext() {
+                return scanner.hasNextLine();
+            }
+
+            @Override
+            public StreamsDatum next() {
+                return converter.convert(scanner.nextLine());
+            }
+
+            @Override
+            public void remove() {
+
+            }
+        }
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,13 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.io.Serializable;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public interface StreamsDatumConverter extends Serializable {
+
+    public StreamsDatum convert(String s);
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,15 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class StringToDocumentConverter implements StreamsDatumConverter {
+
+    @Override
+    public StreamsDatum convert(String s) {
+        return new StreamsDatum(s);
+    }
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.builders.LocalStreamBuilder;
+import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
+import org.apache.streams.test.component.FileReaderProvider;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Test;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestCompoentsLocalStream {
+
+    @Test
+    public void testLocalStreamWithComponent() {
+        LocalStreamBuilder builder = new LocalStreamBuilder();
+        builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
+                                                                        new StringToDocumentConverter()));
+        builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
+                "/TestFile.txt"), 1, "provider")
+        .start();
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestExpectedDatumsPersitWriter {
+
+    private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
+            new StreamsDatum("Document1"),
+            new StreamsDatum("Document2"),
+            new StreamsDatum("Document3"),
+            new StreamsDatum("Document4")
+//            Uncomment to prove failures occur, or comment out a datum above
+//            ,new StreamsDatum("Document5")
+    };
+
+    @Test
+    public void testExpectedDatumsPersistWriterFileName() {
+        testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
+    }
+
+
+
+    private void testDatums(ExpectedDatumsPersistWriter writer) {
+        writer.prepare(null);
+        for(StreamsDatum datum : INPUT_DATUMS) {
+            writer.write(datum);
+        }
+        writer.cleanUp();
+    }
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,39 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.test.component.FileReaderProvider;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.InputStream;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestFileReaderProvider {
+
+
+    @Test
+    public void testFileReaderProviderFileName() {
+        String fileName = "/TestFile.txt";
+        FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
+        provider.prepare(null);
+        StreamsResultSet resultSet = provider.readCurrent();
+        int count = 0;
+        for(StreamsDatum datum : resultSet) {
+            ++count;
+        }
+        assertEquals(4, count);
+        provider.cleanUp();
+    }
+
+
+
+
+
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt Wed Mar  5 21:06:50 2014
@@ -0,0 +1,4 @@
+Document1
+Document2
+Document3
+Document4
\ No newline at end of file

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml Wed Mar  5 21:06:50 2014
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+      <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="module" module-name="streams-core (3)" />
+    <orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" />
+    <orderEntry type="module" module-name="streams-util (3)" />
+    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
+    <orderEntry type="module" module-name="streams-pojo (3)" />
+    <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.codemodel:codemodel:2.6" level="project" />
+    <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
+    <orderEntry type="library" name="Maven: javax.validation:validation-api:1.0.0.GA" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.11" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
+    <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
+    <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml:jackson-xml-databind:0.5.0" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.0" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml:aalto-xml:0.9.9" level="project" />
+    <orderEntry type="library" name="Maven: nz.net.ultraq.jaxb:jaxb-utilities:1.2.6" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
+    <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: org.json:json:20090211" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
+  </component>
+  <component name="POM File Configuration" pomFile="" />
+</module>
+

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Wed Mar  5 21:06:50 2014
@@ -44,6 +44,7 @@ public class ConsolePersistWriter implem
             String text = mapper.writeValueAsString(entry);
 
             System.out.println(text);
+//            LOGGER.info(text);
 
         } catch (JsonProcessingException e) {
             LOGGER.warn("save: {}", e);

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Wed Mar  5 21:06:50 2014
@@ -74,6 +74,7 @@
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java Wed Mar  5 21:06:50 2014
@@ -3,7 +3,9 @@ package org.apache.streams.elasticsearch
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -37,17 +39,19 @@ import java.util.concurrent.*;
 
 public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>
 {
+    public final static String STREAMS_ID = "ElasticsearchPersistReader";
+
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> persistQueue;
 
     private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
     private static final Integer DEFAULT_BATCH_SIZE = 500;
     private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
 
     private ElasticsearchClientManager elasticsearchClientManager;
-    private String[] indexes;
-    private String[] types;
+    private List<String> indexes = Lists.newArrayList();
+    private List<String> types = Lists.newArrayList();
     private String[] withfields;
     private String[] withoutfields;
     private DateTime startDate;
@@ -62,6 +66,8 @@ public class ElasticsearchPersistReader 
 
     private ElasticsearchConfiguration config;
 
+    private ExecutorService executor;
+
     private QueryBuilder queryBuilder;
     private FilterBuilder filterBuilder;
 
@@ -97,42 +103,29 @@ public class ElasticsearchPersistReader 
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
         this.config = ElasticsearchConfigurator.detectConfiguration(config);
     }
-    public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-    }
-
-    public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes) {
+    public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
         this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-        this.indexes = indexes;
-    }
-
-    public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes, String[] types) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-        this.indexes = indexes;
-        this.types = types;
-    }
-
-    public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes, String[] types, String[] withfields, String[] withoutfields) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-        this.indexes = indexes;
-        this.types = types;
-        this.withfields = withfields;
-        this.withoutfields = withoutfields;
+        indexes.add(elasticsearchConfiguration.getIndex());
+        types.add(elasticsearchConfiguration.getType());
     }
 
     @Override
     public void startStream() {
-
+        LOGGER.debug("startStream");
+        executor = Executors.newSingleThreadExecutor();
+        executor.submit(new ElasticsearchPersistReaderTask(this));
     }
 
     @Override
     public void prepare(Object o) {
 
+        persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
         // If we haven't already set up the search, then set up the search.
         if(search == null)
         {
             search = elasticsearchClientManager.getClient()
-                    .prepareSearch(indexes)
+                    .prepareSearch(indexes.toArray(new String[0]))
                     .setSearchType(SearchType.SCAN)
                     .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
                     .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
@@ -141,8 +134,8 @@ public class ElasticsearchPersistReader 
                 search.setQuery(this.queryBuilder);
 
             // If the types are null, then don't specify a type
-            if(this.types != null && this.types.length > 0)
-                search = search.setTypes(types);
+            if(this.types != null && this.types.size() > 0)
+                search = search.setTypes(types.toArray(new String[0]));
 
             Integer clauses = 0;
             if(this.withfields != null || this.withoutfields != null) {
@@ -295,26 +288,17 @@ public class ElasticsearchPersistReader 
     @Override
     public StreamsResultSet readCurrent() {
 
-        Queue<StreamsDatum> currentQueue = new LinkedBlockingQueue<StreamsDatum>();
-        while( hasNext()) {
-            SearchHit hit = next();
-            ObjectNode jsonObject = null;
-            try {
-                jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-                break;
-            }
-            StreamsDatum item = new StreamsDatum(jsonObject);
-            item.getMetadata().put("id", hit.getId());
-            item.getMetadata().put("index", hit.getIndex());
-            item.getMetadata().put("type", hit.getType());
-            currentQueue.add(item);
-        }
+        LOGGER.debug("readCurrent: {}", persistQueue.size());
+
+        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+        Iterators.addAll(currentIterator, persistQueue.iterator());
+
+        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
 
-        cleanUp();
+        persistQueue.clear();
+
+        return current;
 
-        return (StreamsResultSet)currentQueue;
     }
 
     public StreamsResultSet readAll() {
@@ -353,6 +337,7 @@ public class ElasticsearchPersistReader 
     public void cleanUp() {
         LOGGER.info("PersistReader done");
     }
+
 }
 
 

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java Wed Mar  5 21:06:50 2014
@@ -25,27 +25,25 @@ public class ElasticsearchPersistReaderT
     @Override
     public void run() {
 
-        while(true) {
-            StreamsDatum item;
-            while( reader.hasNext()) {
-                SearchHit hit = reader.next();
-                ObjectNode jsonObject = null;
-                try {
-                    jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    break;
-                }
-                item = new StreamsDatum(jsonObject);
-                item.getMetadata().put("id", hit.getId());
-                item.getMetadata().put("index", hit.getIndex());
-                item.getMetadata().put("type", hit.getType());
-                reader.persistQueue.offer(item);
-            }
+        StreamsDatum item;
+        while( reader.hasNext()) {
+            SearchHit hit = reader.next();
+            ObjectNode jsonObject = null;
             try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
+                jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+            } catch (IOException e) {
+                e.printStackTrace();
+                break;
+            }
+            item = new StreamsDatum(jsonObject);
+            item.getMetadata().put("id", hit.getId());
+            item.getMetadata().put("index", hit.getIndex());
+            item.getMetadata().put("type", hit.getType());
+            reader.persistQueue.offer(item);
         }
+        try {
+            Thread.sleep(new Random().nextInt(100));
+        } catch (InterruptedException e) {}
 
     }
 

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Wed Mar  5 21:06:50 2014
@@ -10,6 +10,7 @@ import org.apache.streams.config.Streams
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.pojo.json.Activity;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -123,13 +124,14 @@ public class ElasticsearchPersistWriter 
 
         String json;
         try {
-
+            String id = streamsDatum.getId();
             if( streamsDatum.getDocument() instanceof String )
                 json = streamsDatum.getDocument().toString();
-            else
+            else {
                 json = mapper.writeValueAsString(streamsDatum.getDocument());
+            }
 
-            add(config.getIndex(), config.getType(), null, json);
+            add(config.getIndex(), config.getType(), id, json);
 
         } catch (JsonProcessingException e) {
             LOGGER.warn("{} {}", e.getLocation(), e.getMessage());

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json Wed Mar  5 21:06:50 2014
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration",
+    "extends": {"$ref":"ElasticsearchConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "index": {
+            "type": "string",
+            "description": "Index to write to"
+        },
+        "type": {
+            "type": "string",
+            "description": "Type to write as"
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Wed Mar  5 21:06:50 2014
@@ -101,4 +101,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json Wed Mar  5 21:06:50 2014
@@ -0,0 +1,14 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.hdfs.HdfsReaderConfiguration",
+    "extends": {"$ref":"HdfsConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "readerPath": {
+            "type": "string",
+            "description": "Path below root path"
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Wed Mar  5 21:06:50 2014
@@ -18,6 +18,7 @@
 
 package org.apache.streams.core;
 
+import org.apache.streams.pojo.json.Activity;
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
@@ -31,26 +32,36 @@ import java.util.Map;
 public class StreamsDatum implements Serializable {
 
     public StreamsDatum(Object document) {
-        this.document = document;
-        this.metadata = new HashMap<String, Object>();
+        this(document, null, null, null);
     }
 
-    public StreamsDatum(Object document, BigInteger sequenceid) {
+    public StreamsDatum(Object document, String id) {
+        this(document, id, null, null);
+    }
 
-        this.document = document;
-        this.sequenceid = sequenceid;
-        this.metadata = new HashMap<String, Object>();
+    public StreamsDatum(Object document, BigInteger sequenceid) {
+        this(document, null, null, sequenceid);
     }
 
     public StreamsDatum(Object document, DateTime timestamp) {
-
-        this.document = document;
-        this.timestamp = timestamp;
-        this.metadata = new HashMap<String, Object>();
+        this(document, null, timestamp, null);
     }
 
     public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) {
+        this(document, null, timestamp, sequenceid);
+    }
+
+    public StreamsDatum(Object document, String id, DateTime timestamp) {
+        this(document, id, timestamp, null);
+    }
+
+    public StreamsDatum(Object document, String id, BigInteger sequenceid) {
+        this(document, id, null, sequenceid);
+    }
+
+    public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid) {
         this.document = document;
+        this.id = id;
         this.timestamp = timestamp;
         this.sequenceid = sequenceid;
         this.metadata = new HashMap<String, Object>();
@@ -64,6 +75,8 @@ public class StreamsDatum implements Ser
 
     public Object document;
 
+    private String id;
+
     public DateTime getTimestamp() {
         return timestamp;
     }
@@ -96,6 +109,14 @@ public class StreamsDatum implements Ser
         this.document = document;
     }
 
+
+    public String getId(){
+        if(this.id == null && this.document instanceof Activity) {
+            return ((Activity)this.document).getId();
+        }
+        return id;
+    }
+
     @Override
     public boolean equals(Object o) {
         if(o instanceof StreamsDatum) {
@@ -112,4 +133,9 @@ public class StreamsDatum implements Ser
             return false;
         }
     }
+
+    @Override
+    public String toString() {
+        return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
+    }
 }

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java Wed Mar  5 21:06:50 2014
@@ -147,12 +147,12 @@ public class LocalStreamBuilder implemen
             }
 
             while(isRunning) {
-                //isRunning = false;
+                isRunning = false;
                 for(StreamsProviderTask task : provTasks.values()) {
                     isRunning = isRunning || task.isRunning();
                 }
                 if(isRunning) {
-                    Thread.sleep(10000);
+                    Thread.sleep(100000);
                 }
             }
             this.executor.shutdown();

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Wed Mar  5 21:06:50 2014
@@ -112,10 +112,8 @@ public abstract class BaseStreamsTask im
      */
     protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
         try {
-            if(datum.document instanceof Serializable) {
-                return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
-            }
-            else if(datum.document instanceof ObjectNode) {
+
+            if(datum.document instanceof ObjectNode) {
                 return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
             }
             else if(datum.document instanceof Activity) {
@@ -124,10 +122,14 @@ public abstract class BaseStreamsTask im
                                         datum.timestamp,
                                         datum.sequenceid);
             }
-            else if(this.mapper.canSerialize(datum.document.getClass())){
-                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
-                                        datum.timestamp,
-                                        datum.sequenceid);
+//            else if(this.mapper.canSerialize(datum.document.getClass())){
+//                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
+//                                        datum.timestamp,
+//                                        datum.sequenceid);
+//            }
+
+            else if(datum.document instanceof Serializable) {
+                return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
             }
         } catch (Exception e) {
             LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e);

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java Wed Mar  5 21:06:50 2014
@@ -12,7 +12,7 @@ import java.util.Queue;
  */
 public interface StreamsTask extends Runnable{
 
-    public static final long DEFAULT_SLEEP_TIME_MS = 500;
+    public static final long DEFAULT_SLEEP_TIME_MS = 5000;
 
     /**
      * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,161 @@
+package org.apache.streams.core.builders;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import org.apache.streams.core.test.writer.DatumCounterWriter;
+import org.apache.streams.core.test.writer.SystemOutWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.HashSet;
+import java.util.Scanner;
+
+/**
+ * Basic Tests for the LocalStreamBuilder.
+ *
+ * Test are performed by redirecting system out and counting the number of lines that the SystemOutWriter prints
+ * to System.out.  The SystemOutWriter also prints one line when cleanUp() is called, so this is why it tests for
+ * the numDatums +1.
+ *
+ *
+ */
+public class LocalStreamBuilderTest {
+
+    ByteArrayOutputStream out;
+
+    @Before
+    public void setSystemOut() {
+        out = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(out));
+    }
+
+    @Test
+    public void testStreamIdValidations() {
+        StreamBuilder builder = new LocalStreamBuilder();
+        builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+        Exception exp = null;
+        try {
+            builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+        } catch (RuntimeException e) {
+            exp = e;
+        }
+        assertNotNull(exp);
+        exp = null;
+        builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor(), 1, "id");
+        try {
+            builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor(), 1, "id", "id2");
+        } catch (RuntimeException e) {
+            exp = e;
+        }
+        assertNotNull(exp);
+    }
+
+    @Test
+    public void testBasicLinearStream1()  {
+        int numDatums = 1;
+        StreamBuilder builder = new LocalStreamBuilder();
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        SystemOutWriter writer = new SystemOutWriter();
+        builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+                .addStreamsProcessor("proc1", processor, 1, "sp1")
+                .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+        builder.start();
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertEquals(numDatums+1, count);
+    }
+
+    @Test
+    public void testBasicLinearStream2()  {
+        int numDatums = 1000;
+        StreamBuilder builder = new LocalStreamBuilder();
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        SystemOutWriter writer = new SystemOutWriter();
+        builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+                .addStreamsProcessor("proc1", processor, 1, "sp1")
+                .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+        builder.start();
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertEquals(numDatums+1, count);
+    }
+
+    @Test
+    public void testParallelLinearStream1() {
+        int numDatums = 10000;
+        int parallelHint = 40;
+        PassthroughDatumCounterProcessor.sawData = new HashSet<Integer>();
+        PassthroughDatumCounterProcessor.claimedNumber = new HashSet<Integer>();
+        StreamBuilder builder = new LocalStreamBuilder();
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        SystemOutWriter writer = new SystemOutWriter();
+        builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+                .addStreamsProcessor("proc1", processor, parallelHint, "sp1")
+                .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+        builder.start();
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertEquals(numDatums+1, count); //+1 is to make sure cleanup is called on the writer
+        assertEquals(parallelHint, PassthroughDatumCounterProcessor.claimedNumber.size()); //test 40 were initialized
+        assertTrue(PassthroughDatumCounterProcessor.sawData.size() > 1 && PassthroughDatumCounterProcessor.sawData.size() <= parallelHint); //test more than one processor got data
+    }
+
+    @Test
+    public void testBasicMergeStream() {
+        int numDatums1 = 1;
+        int numDatums2 = 1000;
+        PassthroughDatumCounterProcessor processor1 = new PassthroughDatumCounterProcessor();
+        PassthroughDatumCounterProcessor processor2 = new PassthroughDatumCounterProcessor();
+        SystemOutWriter writer = new SystemOutWriter();
+        StreamBuilder builder = new LocalStreamBuilder();
+        builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums1))
+                .newReadCurrentStream("sp2", new NumericMessageProvider(numDatums2))
+                .addStreamsProcessor("proc1", processor1, 1, "sp1")
+                .addStreamsProcessor("proc2", processor2, 1, "sp2")
+                .addStreamsPersistWriter("writer1", writer, 1, "proc1", "proc2");
+        builder.start();
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertEquals(numDatums1+numDatums2+1, count);
+    }
+
+    @Test
+    public void testBasicBranch() {
+        int numDatums = 1000;
+        StreamBuilder builder = new LocalStreamBuilder();
+        builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
+                .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+                .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "prov1")
+                .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1", "proc2");
+        builder.start();
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertEquals((numDatums*2)+1, count);
+    }
+
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,28 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.test.processors.DoNothingProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import org.apache.streams.core.test.writer.DoNothingWriter;
+import org.apache.streams.core.test.writer.SystemOutWriter;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class ToyLocalBuilderExample {
+
+    /**
+     * A simple example of how to run a stream in local mode.
+     * @param args
+     */
+    public static void main(String[] args) {
+        StreamBuilder builder = new LocalStreamBuilder(new LinkedBlockingQueue<StreamsDatum>());
+        builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
+                .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
+                .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
+        builder.start();
+    }
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,285 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import static org.junit.Assert.*;
+
+import org.apache.streams.core.test.writer.DatumCounterWriter;
+import org.junit.Test;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class BasicTasksTest {
+
+
+
+    @Test
+    public void testProviderTask() {
+        int numMessages = 100;
+        NumericMessageProvider provider = new NumericMessageProvider(numMessages);
+        StreamsProviderTask task = new StreamsProviderTask(provider, false);
+        Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        task.addOutputQueue(outQueue);
+        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        Exception exp = null;
+        try {
+            task.addInputQueue(inQueue);
+        } catch (UnsupportedOperationException uoe) {
+            exp = uoe;
+        }
+        assertNotNull(exp);
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(outQueue.size() != numMessages) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                fail("Provider task failed to output "+numMessages+" in a timely fashion.");
+            }
+        }
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        };
+    }
+
+    @Test
+    public void testProcessorTask() {
+        int numMessages = 100;
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        StreamsProcessorTask task = new StreamsProcessorTask(processor);
+        Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        task.addOutputQueue(outQueue);
+        task.addInputQueue(inQueue);
+        assertEquals(numMessages, task.getInputQueues().get(0).size());
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(inQueue.size() != 0 && outQueue.size() != numMessages) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+            }
+        }
+        task.stopTask();
+        assertEquals(numMessages, processor.getMessageCount());
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        }
+    }
+
+    @Test
+    public void testWriterTask() {
+        int numMessages = 100;
+        DatumCounterWriter writer = new DatumCounterWriter();
+        StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+        Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+
+        Exception exp = null;
+        try {
+            task.addOutputQueue(outQueue);
+        } catch (UnsupportedOperationException uoe) {
+            exp = uoe;
+        }
+        assertNotNull(exp);
+        task.addInputQueue(inQueue);
+        assertEquals(numMessages, task.getInputQueues().get(0).size());
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(inQueue.size() != 0 ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+            }
+        }
+        task.stopTask();
+        assertEquals(numMessages, writer.getDatumsCounted());
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        }
+    }
+
+    @Test
+    public void testMergeTask() {
+        int numMessages = 100;
+        int incoming = 5;
+        StreamsMergeTask task = new StreamsMergeTask();
+        Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        task.addOutputQueue(outQueue);
+        for(int i=0; i < incoming; ++i) {
+            task.addInputQueue(createInputQueue(numMessages));
+        }
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(outQueue.size() != incoming * numMessages ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
+            }
+        }
+        task.stopTask();
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        }
+    }
+
+    @Test
+    public void testBranching() {
+        int numMessages = 100;
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        StreamsProcessorTask task = new StreamsProcessorTask(processor);
+        Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        task.addOutputQueue(outQueue1);
+        task.addOutputQueue(outQueue2);
+        task.addInputQueue(inQueue);
+        assertEquals(numMessages, task.getInputQueues().get(0).size());
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(inQueue.size() != 0 ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+            }
+        }
+        task.stopTask();
+
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        }
+        assertEquals(numMessages, processor.getMessageCount());
+        assertEquals(numMessages, outQueue1.size());
+        assertEquals(numMessages, outQueue2.size());
+    }
+
+    @Test
+    public void testBranchingSerialization() {
+        int numMessages = 1;
+        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+        StreamsProcessorTask task = new StreamsProcessorTask(processor);
+        Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>();
+        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        task.addOutputQueue(outQueue1);
+        task.addOutputQueue(outQueue2);
+        task.addInputQueue(inQueue);
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        service.submit(task);
+        int attempts = 0;
+        while(inQueue.size() != 0 ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            ++attempts;
+            if(attempts == 10) {
+                assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+            }
+        }
+        task.stopTask();
+
+        service.shutdown();
+        try {
+            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+                service.shutdownNow();
+                fail("Service did not terminate.");
+            }
+            assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+        } catch (InterruptedException e) {
+            fail("Test Interupted.");
+        }
+        assertEquals(numMessages, processor.getMessageCount());
+        assertEquals(numMessages, outQueue1.size());
+        assertEquals(numMessages, outQueue2.size());
+        StreamsDatum datum1 = outQueue1.poll();
+        StreamsDatum datum2 = outQueue2.poll();
+        assertNotNull(datum1);
+        assertEquals(datum1, datum2);
+        datum1.setDocument("a");
+        assertNotEquals(datum1, datum2);
+    }
+
+    private Queue<StreamsDatum> createInputQueue(int numDatums) {
+        Queue<StreamsDatum> queue = new ConcurrentLinkedQueue<StreamsDatum>();
+        for(int i=0; i < numDatums; ++i) {
+            queue.add(new StreamsDatum(i));
+        }
+        return queue;
+    }
+
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,32 @@
+package org.apache.streams.core.test.processors;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class DoNothingProcessor implements StreamsProcessor {
+
+    List<StreamsDatum> result;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        this.result = new LinkedList<StreamsDatum>();
+        result.add(entry);
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+        System.out.println("Processor clean up!");
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,49 @@
+package org.apache.streams.core.test.processors;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.*;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class PassthroughDatumCounterProcessor implements StreamsProcessor {
+
+    public static Set<Integer> claimedNumber = new HashSet<Integer>();
+    public static final Random rand = new Random();
+    public static Set<Integer> sawData = new HashSet<Integer>();
+
+    private int count = 0;
+    private int id;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        ++this.count;
+        List<StreamsDatum> result = new LinkedList<StreamsDatum>();
+        result.add(entry);
+        synchronized (sawData) {
+            sawData.add(this.id);
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        synchronized (claimedNumber) {
+            this.id = rand.nextInt();
+            while(!claimedNumber.add(this.id)) {
+                this.id = rand.nextInt();
+            }
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    public int getMessageCount() {
+        return this.count;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,104 @@
+package org.apache.streams.core.test.providers;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
+ */
+public class NumericMessageProvider implements StreamsProvider {
+
+    private int numMessages;
+
+    public NumericMessageProvider(int numMessages) {
+        this.numMessages = numMessages;
+    }
+
+    @Override
+    public void startStream() {
+        // no op
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return new ResultSet();
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return new ResultSet();
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return new ResultSet();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+
+    private class ResultSet extends StreamsResultSet {
+
+        private ResultSet() {
+            super(new ConcurrentLinkedQueue<StreamsDatum>());
+        }
+
+//        @Override
+//        public long getStartTime() {
+//            return 0;
+//        }
+//
+//        @Override
+//        public long getEndTime() {
+//            return 0;
+//        }
+//
+//        @Override
+//        public String getSourceId() {
+//            return null;
+//        }
+//
+//        @Override
+//        public BigInteger getMaxSequence() {
+//            return null;
+//        }
+
+        @Override
+        public Iterator<StreamsDatum> iterator() {
+            return new Iterator<StreamsDatum>() {
+                private int i = 0;
+
+                @Override
+                public boolean hasNext() {
+                    return i < numMessages;
+                }
+
+                @Override
+                public StreamsDatum next() {
+                    return new StreamsDatum(i++);
+                }
+
+                @Override
+                public void remove() {
+
+                }
+            };
+        }
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,31 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class DatumCounterWriter implements StreamsPersistWriter{
+
+    private int counter = 0;
+
+    @Override
+    public void write(StreamsDatum entry) {
+        ++this.counter;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+        System.out.println("clean up called");
+    }
+
+    public int getDatumsCounted() {
+        return this.counter;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,24 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class DoNothingWriter implements StreamsPersistWriter {
+    @Override
+    public void write(StreamsDatum entry) {
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+        System.out.println("Writer Clean Up!");
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java Wed Mar  5 21:06:50 2014
@@ -0,0 +1,24 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class SystemOutWriter implements StreamsPersistWriter {
+    @Override
+    public void write(StreamsDatum entry) {
+        System.out.println(entry.document);
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+        System.out.println("Clean up called writer!");
+    }
+}

Modified: incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml Wed Mar  5 21:06:50 2014
@@ -61,11 +61,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.fasterxml</groupId>
-            <artifactId>aalto-xml</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>