You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/05 22:06:51 UTC
svn commit: r1574660 - in /incubator/streams/branches/STREAMS-26: ./
streams-contrib/ streams-contrib/streams-components-test/
streams-contrib/streams-components-test/src/
streams-contrib/streams-components-test/src/main/
streams-contrib/streams-compon...
Author: sblackmon
Date: Wed Mar 5 21:06:50 2014
New Revision: 1574660
URL: http://svn.apache.org/r1574660
Log:
merging ryan and steve latest
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/streams/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/java/org/apache/streams/test/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/main/resources/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt
incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json
incubator/streams/branches/STREAMS-26/streams-core/src/test/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java
incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java
Modified:
incubator/streams/branches/STREAMS-26/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Wed Mar 5 21:06:50 2014
@@ -155,10 +155,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </dependency>
- <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Wed Mar 5 21:06:50 2014
@@ -47,8 +47,9 @@
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
<!--<module>streams-provider-sysomos</module>-->
- <module>streams-provider-rss</module>
+ <!--<module>streams-provider-rss</module>-->
<!--<module>streams-proxy-semantria</module>-->
+ <module>streams-components-test</module>
</modules>
<dependencyManagement>
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/pom.xml Wed Mar 5 21:06:50 2014
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-components-test</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,54 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public class ExpectedDatumsPersistWriter implements StreamsPersistWriter{
+
+ private StreamsDatumConverter converter;
+ private String fileName;
+ private List<StreamsDatum> expectedDatums;
+ private int counted = 0;
+ private int expectedSize = 0;
+
+ public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
+ this.converter = converter;
+ this.fileName = filePathInResources;
+ }
+
+
+
+ @Override
+ public void write(StreamsDatum entry) {
+ int index = this.expectedDatums.indexOf(entry);
+ assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
+ this.expectedDatums.remove(index);
+ ++this.counted;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
+ this.expectedDatums = new LinkedList<StreamsDatum>();
+ while(scanner.hasNextLine()) {
+ this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
+ }
+ this.expectedSize = this.expectedDatums.size();
+ }
+
+ @Override
+ public void cleanUp() {
+ assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/FileReaderProvider.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,98 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.sql.ResultSet;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Scanner;
+
+/**
+ * FOR TESTING PURPOSES ONLY.
+ *
+ * The StreamProvider reads from a File or InputStream. Each line of the file will be emitted as the document of a
+ * streams datum.
+ *
+ */
+public class FileReaderProvider implements StreamsProvider {
+
+ private String fileName;
+ private InputStream inStream;
+ private Scanner scanner;
+ private StreamsDatumConverter converter;
+
+ public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
+ this.fileName = filePathInResources;
+ this.converter = converter;
+ }
+
+ @Override
+ public void startStream() {
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return new ResultSet();
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
+ }
+
+ @Override
+ public void cleanUp() {
+ this.scanner.close();
+ }
+
+ private class ResultSet extends StreamsResultSet {
+
+ public ResultSet() {
+ super(null);
+ }
+
+
+ @Override
+ public Iterator<StreamsDatum> iterator() {
+ return new FileProviderIterator();
+ }
+
+ private class FileProviderIterator implements Iterator<StreamsDatum> {
+
+
+
+ @Override
+ public boolean hasNext() {
+ return scanner.hasNextLine();
+ }
+
+ @Override
+ public StreamsDatum next() {
+ return converter.convert(scanner.nextLine());
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ }
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,13 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.io.Serializable;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public interface StreamsDatumConverter extends Serializable {
+
+ public StreamsDatum convert(String s);
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,15 @@
+package org.apache.streams.test.component;
+
+import org.apache.streams.core.StreamsDatum;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class StringToDocumentConverter implements StreamsDatumConverter {
+
+ @Override
+ public StreamsDatum convert(String s) {
+ return new StreamsDatum(s);
+ }
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestCompoentsLocalStream.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.builders.LocalStreamBuilder;
+import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
+import org.apache.streams.test.component.FileReaderProvider;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Test;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestCompoentsLocalStream {
+
+ @Test
+ public void testLocalStreamWithComponent() {
+ LocalStreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
+ new StringToDocumentConverter()));
+ builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
+ "/TestFile.txt"), 1, "provider")
+ .start();
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestExpectedDatumsPersitWriter {
+
+ private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
+ new StreamsDatum("Document1"),
+ new StreamsDatum("Document2"),
+ new StreamsDatum("Document3"),
+ new StreamsDatum("Document4")
+// Uncomment to prove failures occur, or comment out a datum above
+// ,new StreamsDatum("Document5")
+ };
+
+ @Test
+ public void testExpectedDatumsPersistWriterFileName() {
+ testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
+ }
+
+
+
+ private void testDatums(ExpectedDatumsPersistWriter writer) {
+ writer.prepare(null);
+ for(StreamsDatum datum : INPUT_DATUMS) {
+ writer.write(datum);
+ }
+ writer.cleanUp();
+ }
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,39 @@
+package org.apache.streams.test.component.tests;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.test.component.FileReaderProvider;
+import org.apache.streams.test.component.StringToDocumentConverter;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.InputStream;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by rebanks on 2/28/14.
+ */
+public class TestFileReaderProvider {
+
+
+ @Test
+ public void testFileReaderProviderFileName() {
+ String fileName = "/TestFile.txt";
+ FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
+ provider.prepare(null);
+ StreamsResultSet resultSet = provider.readCurrent();
+ int count = 0;
+ for(StreamsDatum datum : resultSet) {
+ ++count;
+ }
+ assertEquals(4, count);
+ provider.cleanUp();
+ }
+
+
+
+
+
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/src/test/resources/TestFile.txt Wed Mar 5 21:06:50 2014
@@ -0,0 +1,4 @@
+Document1
+Document2
+Document3
+Document4
\ No newline at end of file
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-components-test/streams-components-test.iml Wed Mar 5 21:06:50 2014
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <output-test url="file://$MODULE_DIR$/target/test-classes" />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="module" module-name="streams-core (3)" />
+ <orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" />
+ <orderEntry type="module" module-name="streams-util (3)" />
+ <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+ <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
+ <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
+ <orderEntry type="module" module-name="streams-pojo (3)" />
+ <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.codemodel:codemodel:2.6" level="project" />
+ <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
+ <orderEntry type="library" name="Maven: javax.validation:validation-api:1.0.0.GA" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.11" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml:jackson-xml-databind:0.5.0" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.0" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml:aalto-xml:0.9.9" level="project" />
+ <orderEntry type="library" name="Maven: nz.net.ultraq.jaxb:jaxb-utilities:1.2.6" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
+ <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: org.json:json:20090211" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
+ </component>
+ <component name="POM File Configuration" pomFile="" />
+</module>
+
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Wed Mar 5 21:06:50 2014
@@ -44,6 +44,7 @@ public class ConsolePersistWriter implem
String text = mapper.writeValueAsString(entry);
System.out.println(text);
+// LOGGER.info(text);
} catch (JsonProcessingException e) {
LOGGER.warn("save: {}", e);
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Wed Mar 5 21:06:50 2014
@@ -74,6 +74,7 @@
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java Wed Mar 5 21:06:50 2014
@@ -3,7 +3,9 @@ package org.apache.streams.elasticsearch
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -37,17 +39,19 @@ import java.util.concurrent.*;
public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>
{
+ public final static String STREAMS_ID = "ElasticsearchPersistReader";
+
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
- protected volatile Queue<StreamsDatum> persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ protected volatile Queue<StreamsDatum> persistQueue;
private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
private static final Integer DEFAULT_BATCH_SIZE = 500;
private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
private ElasticsearchClientManager elasticsearchClientManager;
- private String[] indexes;
- private String[] types;
+ private List<String> indexes = Lists.newArrayList();
+ private List<String> types = Lists.newArrayList();
private String[] withfields;
private String[] withoutfields;
private DateTime startDate;
@@ -62,6 +66,8 @@ public class ElasticsearchPersistReader
private ElasticsearchConfiguration config;
+ private ExecutorService executor;
+
private QueryBuilder queryBuilder;
private FilterBuilder filterBuilder;
@@ -97,42 +103,29 @@ public class ElasticsearchPersistReader
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
this.config = ElasticsearchConfigurator.detectConfiguration(config);
}
- public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration) {
- this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- }
-
- public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes) {
+ public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- this.indexes = indexes;
- }
-
- public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes, String[] types) {
- this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- this.indexes = indexes;
- this.types = types;
- }
-
- public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration, String[] indexes, String[] types, String[] withfields, String[] withoutfields) {
- this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- this.indexes = indexes;
- this.types = types;
- this.withfields = withfields;
- this.withoutfields = withoutfields;
+ indexes.add(elasticsearchConfiguration.getIndex());
+ types.add(elasticsearchConfiguration.getType());
}
@Override
public void startStream() {
-
+ LOGGER.debug("startStream");
+ executor = Executors.newSingleThreadExecutor();
+ executor.submit(new ElasticsearchPersistReaderTask(this));
}
@Override
public void prepare(Object o) {
+ persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
// If we haven't already set up the search, then set up the search.
if(search == null)
{
search = elasticsearchClientManager.getClient()
- .prepareSearch(indexes)
+ .prepareSearch(indexes.toArray(new String[0]))
.setSearchType(SearchType.SCAN)
.setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
.setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
@@ -141,8 +134,8 @@ public class ElasticsearchPersistReader
search.setQuery(this.queryBuilder);
// If the types are null, then don't specify a type
- if(this.types != null && this.types.length > 0)
- search = search.setTypes(types);
+ if(this.types != null && this.types.size() > 0)
+ search = search.setTypes(types.toArray(new String[0]));
Integer clauses = 0;
if(this.withfields != null || this.withoutfields != null) {
@@ -295,26 +288,17 @@ public class ElasticsearchPersistReader
@Override
public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> currentQueue = new LinkedBlockingQueue<StreamsDatum>();
- while( hasNext()) {
- SearchHit hit = next();
- ObjectNode jsonObject = null;
- try {
- jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
- StreamsDatum item = new StreamsDatum(jsonObject);
- item.getMetadata().put("id", hit.getId());
- item.getMetadata().put("index", hit.getIndex());
- item.getMetadata().put("type", hit.getType());
- currentQueue.add(item);
- }
+ LOGGER.debug("readCurrent: {}", persistQueue.size());
+
+ Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+ Iterators.addAll(currentIterator, persistQueue.iterator());
+
+ StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
- cleanUp();
+ persistQueue.clear();
+
+ return current;
- return (StreamsResultSet)currentQueue;
}
public StreamsResultSet readAll() {
@@ -353,6 +337,7 @@ public class ElasticsearchPersistReader
public void cleanUp() {
LOGGER.info("PersistReader done");
}
+
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java Wed Mar 5 21:06:50 2014
@@ -25,27 +25,25 @@ public class ElasticsearchPersistReaderT
@Override
public void run() {
- while(true) {
- StreamsDatum item;
- while( reader.hasNext()) {
- SearchHit hit = reader.next();
- ObjectNode jsonObject = null;
- try {
- jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
- item = new StreamsDatum(jsonObject);
- item.getMetadata().put("id", hit.getId());
- item.getMetadata().put("index", hit.getIndex());
- item.getMetadata().put("type", hit.getType());
- reader.persistQueue.offer(item);
- }
+ StreamsDatum item;
+ while( reader.hasNext()) {
+ SearchHit hit = reader.next();
+ ObjectNode jsonObject = null;
try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
+ jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ item = new StreamsDatum(jsonObject);
+ item.getMetadata().put("id", hit.getId());
+ item.getMetadata().put("index", hit.getIndex());
+ item.getMetadata().put("type", hit.getType());
+ reader.persistQueue.offer(item);
}
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) {}
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Wed Mar 5 21:06:50 2014
@@ -10,6 +10,7 @@ import org.apache.streams.config.Streams
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.pojo.json.Activity;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -123,13 +124,14 @@ public class ElasticsearchPersistWriter
String json;
try {
-
+ String id = streamsDatum.getId();
if( streamsDatum.getDocument() instanceof String )
json = streamsDatum.getDocument().toString();
- else
+ else {
json = mapper.writeValueAsString(streamsDatum.getDocument());
+ }
- add(config.getIndex(), config.getType(), null, json);
+ add(config.getIndex(), config.getType(), id, json);
} catch (JsonProcessingException e) {
LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json Wed Mar 5 21:06:50 2014
@@ -0,0 +1,18 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration",
+ "extends": {"$ref":"ElasticsearchConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "index": {
+ "type": "string",
+ "description": "Index to write to"
+ },
+ "type": {
+ "type": "string",
+ "description": "Type to write as"
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Wed Mar 5 21:06:50 2014
@@ -101,4 +101,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json Wed Mar 5 21:06:50 2014
@@ -0,0 +1,14 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.hdfs.HdfsReaderConfiguration",
+ "extends": {"$ref":"HdfsConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "readerPath": {
+ "type": "string",
+ "description": "Path below root path"
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Wed Mar 5 21:06:50 2014
@@ -18,6 +18,7 @@
package org.apache.streams.core;
+import org.apache.streams.pojo.json.Activity;
import org.joda.time.DateTime;
import java.io.Serializable;
@@ -31,26 +32,36 @@ import java.util.Map;
public class StreamsDatum implements Serializable {
public StreamsDatum(Object document) {
- this.document = document;
- this.metadata = new HashMap<String, Object>();
+ this(document, null, null, null);
}
- public StreamsDatum(Object document, BigInteger sequenceid) {
+ public StreamsDatum(Object document, String id) {
+ this(document, id, null, null);
+ }
- this.document = document;
- this.sequenceid = sequenceid;
- this.metadata = new HashMap<String, Object>();
+ public StreamsDatum(Object document, BigInteger sequenceid) {
+ this(document, null, null, sequenceid);
}
public StreamsDatum(Object document, DateTime timestamp) {
-
- this.document = document;
- this.timestamp = timestamp;
- this.metadata = new HashMap<String, Object>();
+ this(document, null, timestamp, null);
}
public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) {
+ this(document, null, timestamp, sequenceid);
+ }
+
+ public StreamsDatum(Object document, String id, DateTime timestamp) {
+ this(document, id, timestamp, null);
+ }
+
+ public StreamsDatum(Object document, String id, BigInteger sequenceid) {
+ this(document, id, null, sequenceid);
+ }
+
+ public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid) {
this.document = document;
+ this.id = id;
this.timestamp = timestamp;
this.sequenceid = sequenceid;
this.metadata = new HashMap<String, Object>();
@@ -64,6 +75,8 @@ public class StreamsDatum implements Ser
public Object document;
+ private String id;
+
public DateTime getTimestamp() {
return timestamp;
}
@@ -96,6 +109,14 @@ public class StreamsDatum implements Ser
this.document = document;
}
+
+ public String getId(){
+ if(this.id == null && this.document instanceof Activity) {
+ return ((Activity)this.document).getId();
+ }
+ return id;
+ }
+
@Override
public boolean equals(Object o) {
if(o instanceof StreamsDatum) {
@@ -112,4 +133,9 @@ public class StreamsDatum implements Ser
return false;
}
}
+
+ @Override
+ public String toString() {
+ return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
+ }
}
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java Wed Mar 5 21:06:50 2014
@@ -147,12 +147,12 @@ public class LocalStreamBuilder implemen
}
while(isRunning) {
- //isRunning = false;
+ isRunning = false;
for(StreamsProviderTask task : provTasks.values()) {
isRunning = isRunning || task.isRunning();
}
if(isRunning) {
- Thread.sleep(10000);
+ Thread.sleep(100000);
}
}
this.executor.shutdown();
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Wed Mar 5 21:06:50 2014
@@ -112,10 +112,8 @@ public abstract class BaseStreamsTask im
*/
protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
try {
- if(datum.document instanceof Serializable) {
- return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
- }
- else if(datum.document instanceof ObjectNode) {
+
+ if(datum.document instanceof ObjectNode) {
return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
}
else if(datum.document instanceof Activity) {
@@ -124,10 +122,14 @@ public abstract class BaseStreamsTask im
datum.timestamp,
datum.sequenceid);
}
- else if(this.mapper.canSerialize(datum.document.getClass())){
- return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
- datum.timestamp,
- datum.sequenceid);
+// else if(this.mapper.canSerialize(datum.document.getClass())){
+// return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
+// datum.timestamp,
+// datum.sequenceid);
+// }
+
+ else if(datum.document instanceof Serializable) {
+ return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
}
} catch (Exception e) {
LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e);
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java Wed Mar 5 21:06:50 2014
@@ -12,7 +12,7 @@ import java.util.Queue;
*/
public interface StreamsTask extends Runnable{
- public static final long DEFAULT_SLEEP_TIME_MS = 500;
+ public static final long DEFAULT_SLEEP_TIME_MS = 5000;
/**
* Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,161 @@
+package org.apache.streams.core.builders;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import org.apache.streams.core.test.writer.DatumCounterWriter;
+import org.apache.streams.core.test.writer.SystemOutWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.HashSet;
+import java.util.Scanner;
+
+/**
+ * Basic Tests for the LocalStreamBuilder.
+ *
+ * Test are performed by redirecting system out and counting the number of lines that the SystemOutWriter prints
+ * to System.out. The SystemOutWriter also prints one line when cleanUp() is called, so this is why it tests for
+ * the numDatums +1.
+ *
+ *
+ */
+public class LocalStreamBuilderTest {
+
+ ByteArrayOutputStream out;
+
+ @Before
+ public void setSystemOut() {
+ out = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(out));
+ }
+
+ @Test
+ public void testStreamIdValidations() {
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+ Exception exp = null;
+ try {
+ builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+ } catch (RuntimeException e) {
+ exp = e;
+ }
+ assertNotNull(exp);
+ exp = null;
+ builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor(), 1, "id");
+ try {
+ builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor(), 1, "id", "id2");
+ } catch (RuntimeException e) {
+ exp = e;
+ }
+ assertNotNull(exp);
+ }
+
+ @Test
+ public void testBasicLinearStream1() {
+ int numDatums = 1;
+ StreamBuilder builder = new LocalStreamBuilder();
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ SystemOutWriter writer = new SystemOutWriter();
+ builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", processor, 1, "sp1")
+ .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+ builder.start();
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertEquals(numDatums+1, count);
+ }
+
+ @Test
+ public void testBasicLinearStream2() {
+ int numDatums = 1000;
+ StreamBuilder builder = new LocalStreamBuilder();
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ SystemOutWriter writer = new SystemOutWriter();
+ builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", processor, 1, "sp1")
+ .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+ builder.start();
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertEquals(numDatums+1, count);
+ }
+
+ @Test
+ public void testParallelLinearStream1() {
+ int numDatums = 10000;
+ int parallelHint = 40;
+ PassthroughDatumCounterProcessor.sawData = new HashSet<Integer>();
+ PassthroughDatumCounterProcessor.claimedNumber = new HashSet<Integer>();
+ StreamBuilder builder = new LocalStreamBuilder();
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ SystemOutWriter writer = new SystemOutWriter();
+ builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", processor, parallelHint, "sp1")
+ .addStreamsPersistWriter("writer1", writer, 1, "proc1");
+ builder.start();
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertEquals(numDatums+1, count); //+1 is to make sure cleanup is called on the writer
+ assertEquals(parallelHint, PassthroughDatumCounterProcessor.claimedNumber.size()); //test 40 were initialized
+ assertTrue(PassthroughDatumCounterProcessor.sawData.size() > 1 && PassthroughDatumCounterProcessor.sawData.size() <= parallelHint); //test more than one processor got data
+ }
+
+ @Test
+ public void testBasicMergeStream() {
+ int numDatums1 = 1;
+ int numDatums2 = 1000;
+ PassthroughDatumCounterProcessor processor1 = new PassthroughDatumCounterProcessor();
+ PassthroughDatumCounterProcessor processor2 = new PassthroughDatumCounterProcessor();
+ SystemOutWriter writer = new SystemOutWriter();
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums1))
+ .newReadCurrentStream("sp2", new NumericMessageProvider(numDatums2))
+ .addStreamsProcessor("proc1", processor1, 1, "sp1")
+ .addStreamsProcessor("proc2", processor2, 1, "sp2")
+ .addStreamsPersistWriter("writer1", writer, 1, "proc1", "proc2");
+ builder.start();
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertEquals(numDatums1+numDatums2+1, count);
+ }
+
+ @Test
+ public void testBasicBranch() {
+ int numDatums = 1000;
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "prov1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1", "proc2");
+ builder.start();
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertEquals((numDatums*2)+1, count);
+ }
+
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/builders/ToyLocalBuilderExample.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,28 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.test.processors.DoNothingProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import org.apache.streams.core.test.writer.DoNothingWriter;
+import org.apache.streams.core.test.writer.SystemOutWriter;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class ToyLocalBuilderExample {
+
+ /**
+ * A simple example of how to run a stream in local mode.
+ * @param args
+ */
+ public static void main(String[] args) {
+ StreamBuilder builder = new LocalStreamBuilder(new LinkedBlockingQueue<StreamsDatum>());
+ builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
+ .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
+ .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
+ builder.start();
+ }
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/tasks/BasicTasksTest.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,285 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
+import org.apache.streams.core.test.providers.NumericMessageProvider;
+import static org.junit.Assert.*;
+
+import org.apache.streams.core.test.writer.DatumCounterWriter;
+import org.junit.Test;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class BasicTasksTest {
+
+
+
+ @Test
+ public void testProviderTask() {
+ int numMessages = 100;
+ NumericMessageProvider provider = new NumericMessageProvider(numMessages);
+ StreamsProviderTask task = new StreamsProviderTask(provider, false);
+ Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ task.addOutputQueue(outQueue);
+ Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ Exception exp = null;
+ try {
+ task.addInputQueue(inQueue);
+ } catch (UnsupportedOperationException uoe) {
+ exp = uoe;
+ }
+ assertNotNull(exp);
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(outQueue.size() != numMessages) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ fail("Provider task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ };
+ }
+
+ @Test
+ public void testProcessorTask() {
+ int numMessages = 100;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 && outQueue.size() != numMessages) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ task.stopTask();
+ assertEquals(numMessages, processor.getMessageCount());
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ }
+ }
+
+ @Test
+ public void testWriterTask() {
+ int numMessages = 100;
+ DatumCounterWriter writer = new DatumCounterWriter();
+ StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+ Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+
+ Exception exp = null;
+ try {
+ task.addOutputQueue(outQueue);
+ } catch (UnsupportedOperationException uoe) {
+ exp = uoe;
+ }
+ assertNotNull(exp);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ task.stopTask();
+ assertEquals(numMessages, writer.getDatumsCounted());
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ }
+ }
+
+ @Test
+ public void testMergeTask() {
+ int numMessages = 100;
+ int incoming = 5;
+ StreamsMergeTask task = new StreamsMergeTask();
+ Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ task.addOutputQueue(outQueue);
+ for(int i=0; i < incoming; ++i) {
+ task.addInputQueue(createInputQueue(numMessages));
+ }
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(outQueue.size() != incoming * numMessages ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
+ }
+ }
+ task.stopTask();
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ }
+ }
+
+ @Test
+ public void testBranching() {
+ int numMessages = 100;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue1);
+ task.addOutputQueue(outQueue2);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+ }
+ }
+ task.stopTask();
+
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ }
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, outQueue1.size());
+ assertEquals(numMessages, outQueue2.size());
+ }
+
+ @Test
+ public void testBranchingSerialization() {
+ int numMessages = 1;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>();
+ Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue1);
+ task.addOutputQueue(outQueue2);
+ task.addInputQueue(inQueue);
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+ }
+ }
+ task.stopTask();
+
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in aloted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interupted.");
+ }
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, outQueue1.size());
+ assertEquals(numMessages, outQueue2.size());
+ StreamsDatum datum1 = outQueue1.poll();
+ StreamsDatum datum2 = outQueue2.poll();
+ assertNotNull(datum1);
+ assertEquals(datum1, datum2);
+ datum1.setDocument("a");
+ assertNotEquals(datum1, datum2);
+ }
+
+ private Queue<StreamsDatum> createInputQueue(int numDatums) {
+ Queue<StreamsDatum> queue = new ConcurrentLinkedQueue<StreamsDatum>();
+ for(int i=0; i < numDatums; ++i) {
+ queue.add(new StreamsDatum(i));
+ }
+ return queue;
+ }
+
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/DoNothingProcessor.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,32 @@
+package org.apache.streams.core.test.processors;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class DoNothingProcessor implements StreamsProcessor {
+
+ List<StreamsDatum> result;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ this.result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ System.out.println("Processor clean up!");
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/processors/PassthroughDatumCounterProcessor.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,49 @@
+package org.apache.streams.core.test.processors;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.*;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class PassthroughDatumCounterProcessor implements StreamsProcessor {
+
+ public static Set<Integer> claimedNumber = new HashSet<Integer>();
+ public static final Random rand = new Random();
+ public static Set<Integer> sawData = new HashSet<Integer>();
+
+ private int count = 0;
+ private int id;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ ++this.count;
+ List<StreamsDatum> result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ synchronized (sawData) {
+ sawData.add(this.id);
+ }
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ synchronized (claimedNumber) {
+ this.id = rand.nextInt();
+ while(!claimedNumber.add(this.id)) {
+ this.id = rand.nextInt();
+ }
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ public int getMessageCount() {
+ return this.count;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/providers/NumericMessageProvider.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,104 @@
+package org.apache.streams.core.test.providers;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
+ */
+public class NumericMessageProvider implements StreamsProvider {
+
+ private int numMessages;
+
+ public NumericMessageProvider(int numMessages) {
+ this.numMessages = numMessages;
+ }
+
+ @Override
+ public void startStream() {
+ // no op
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return new ResultSet();
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return new ResultSet();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return new ResultSet();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+
+ private class ResultSet extends StreamsResultSet {
+
+ private ResultSet() {
+ super(new ConcurrentLinkedQueue<StreamsDatum>());
+ }
+
+// @Override
+// public long getStartTime() {
+// return 0;
+// }
+//
+// @Override
+// public long getEndTime() {
+// return 0;
+// }
+//
+// @Override
+// public String getSourceId() {
+// return null;
+// }
+//
+// @Override
+// public BigInteger getMaxSequence() {
+// return null;
+// }
+
+ @Override
+ public Iterator<StreamsDatum> iterator() {
+ return new Iterator<StreamsDatum>() {
+ private int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < numMessages;
+ }
+
+ @Override
+ public StreamsDatum next() {
+ return new StreamsDatum(i++);
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ };
+ }
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DatumCounterWriter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,31 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class DatumCounterWriter implements StreamsPersistWriter{
+
+ private int counter = 0;
+
+ @Override
+ public void write(StreamsDatum entry) {
+ ++this.counter;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ System.out.println("clean up called");
+ }
+
+ public int getDatumsCounted() {
+ return this.counter;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/DoNothingWriter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,24 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class DoNothingWriter implements StreamsPersistWriter {
+ @Override
+ public void write(StreamsDatum entry) {
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ System.out.println("Writer Clean Up!");
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java?rev=1574660&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/test/java/org/apache/streams/core/test/writer/SystemOutWriter.java Wed Mar 5 21:06:50 2014
@@ -0,0 +1,24 @@
+package org.apache.streams.core.test.writer;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+/**
+ * Created by rebanks on 2/20/14.
+ */
+public class SystemOutWriter implements StreamsPersistWriter {
+ @Override
+ public void write(StreamsDatum entry) {
+ System.out.println(entry.document);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ System.out.println("Clean up called writer!");
+ }
+}
Modified: incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml?rev=1574660&r1=1574659&r2=1574660&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml Wed Mar 5 21:06:50 2014
@@ -61,11 +61,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml</groupId>
- <artifactId>aalto-xml</artifactId>
- </dependency>
-
- <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>