You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/11 02:06:58 UTC
incubator-streams git commit: simple file reader/writer
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-214 [created] d72d8f7e1
simple file reader/writer
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d72d8f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d72d8f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d72d8f7e
Branch: refs/heads/STREAMS-214
Commit: d72d8f7e174ed528007473c9179444bd1eb109c6
Parents: bfa9466
Author: sblackmon <sb...@apache.org>
Authored: Mon Nov 10 19:06:54 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Mon Nov 10 19:06:54 2014 -0600
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
streams-contrib/streams-persist-file/README.md | 12 ++
streams-contrib/streams-persist-file/pom.xml | 125 ++++++++++++++
.../apache/streams/file/FileConfigurator.java | 51 ++++++
.../apache/streams/file/FilePersistReader.java | 163 +++++++++++++++++++
.../apache/streams/file/FilePersistWriter.java | 107 ++++++++++++
.../apache/streams/file/FileConfiguration.json | 14 ++
.../apache/streams/file/test/FilePersistIT.java | 112 +++++++++++++
.../streams/file/test/TestFilePersist.java | 60 +++++++
9 files changed, 645 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index fcec297..772ce49 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
<module>streams-persist-cassandra</module>
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
+ <module>streams-persist-file</module>
<module>streams-persist-hbase</module>
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/README.md b/streams-contrib/streams-persist-file/README.md
new file mode 100644
index 0000000..30a0d1c
--- /dev/null
+++ b/streams-contrib/streams-persist-file/README.md
@@ -0,0 +1,12 @@
+streams-persist-file
+=====================
+
+Read to / write from File-backed Queue
+
+Example reader/writer configuration:
+
+ file {
+ path = "/tmp/file-queue.txt"
+ }
+
+Reader will consume lines from Writer
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/pom.xml b/streams-contrib/streams-persist-file/pom.xml
new file mode 100644
index 0000000..3eed49d
--- /dev/null
+++ b/streams-contrib/streams-persist-file/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-file</artifactId>
+
+ <properties>
+ <tape.version>1.2.3</tape.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup</groupId>
+ <artifactId>tape</artifactId>
+ <version>${tape.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-console</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.file.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
new file mode 100644
index 0000000..7e1bd96
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class FileConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(FileConfigurator.class);
+
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+ public static FileConfiguration detectConfiguration(Config kafka) {
+
+ FileConfiguration fileConfiguration = null;
+
+ try {
+ fileConfiguration = mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), FileConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse FileConfiguration");
+ }
+
+ return fileConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
new file mode 100644
index 0000000..75ee8b6
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
+import com.squareup.tape.QueueFile;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class FilePersistReader implements StreamsPersistReader, Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private FileConfiguration config;
+
+ private QueueFile queueFile;
+
+ private boolean isStarted = false;
+ private boolean isStopped = false;
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ public FilePersistReader() {
+ this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file")));
+ }
+
+ public FilePersistReader(FileConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ @Override
+ public void startStream() {
+ isStarted = true;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ while (!queueFile.isEmpty()) {
+ try {
+ byte[] bytes = queueFile.peek();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ BufferedReader buf = new BufferedReader(new InputStreamReader(bais));
+ String s = buf.readLine();
+ System.out.println(s);
+ write(new StreamsDatum(s));
+ queueFile.remove();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ StreamsResultSet current;
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ persistQueue.clear();
+
+ return current;
+ }
+
+ private void write( StreamsDatum entry ) {
+ persistQueue.offer(entry);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isStarted && !isStopped;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
+ try {
+ queueFile = new QueueFile(new File(config.getFile()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ Preconditions.checkNotNull(queueFile);
+
+ this.persistQueue = new ConcurrentLinkedQueue<>();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ queueFile.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ queueFile = null;
+ isStopped = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
new file mode 100644
index 0000000..bbae9fb
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.squareup.tape.QueueFile;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.util.GuidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class FilePersistWriter implements StreamsPersistWriter, Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistWriter.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper;
+
+ private FileConfiguration config;
+
+ private QueueFile queueFile;
+
+ public FilePersistWriter() {
+ this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file")));
+ }
+
+ public FilePersistWriter(FileConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter");
+
+ Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
+ Preconditions.checkArgument(entry.getDocument() instanceof String);
+ Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
+
+ byte[] item = ((String)entry.getDocument()).getBytes();
+ try {
+ queueFile.add(item);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ mapper = new ObjectMapper();
+
+ try {
+ queueFile = new QueueFile(new File(config.getFile()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ Preconditions.checkNotNull(queueFile);
+
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ queueFile.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ queueFile = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
new file mode 100644
index 0000000..5604737
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
@@ -0,0 +1,14 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.file.FileConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "A file path to read/write from",
+ "default": "/tmp/streams-file-queue.txt"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
new file mode 100644
index 0000000..e7d50bf
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
@@ -0,0 +1,112 @@
+package org.apache.streams.file.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.console.ConsolePersistReader;
+import org.apache.streams.console.ConsolePersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.file.FileConfiguration;
+import org.apache.streams.file.FilePersistReader;
+import org.apache.streams.file.FilePersistWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class FilePersistIT {
+
+ private FileConfiguration testConfiguration;
+
+ ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class);
+ ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class);
+
+ StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}");
+ StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}");
+ StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}");
+
+ @Before
+ public void prepareTest() {
+
+ testConfiguration = new FileConfiguration();
+ //testConfiguration.setFile("./test-queue.txt");
+
+ File file = new File( testConfiguration.getFile());
+ if( file.exists() )
+ file.delete();
+
+ PowerMockito.when(reader.readCurrent())
+ .thenReturn(
+ new StreamsResultSet(Queues.newConcurrentLinkedQueue(
+ Lists.newArrayList(testDatum1, testDatum2, testDatum3)))
+ ).thenReturn(null);
+ }
+
+ @Test
+ public void testPersistStream() {
+
+ assert(testConfiguration != null);
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000);
+
+ StreamBuilder builder = new LocalStreamBuilder(1, streamConfig);
+
+ FilePersistWriter fileWriter = new FilePersistWriter(testConfiguration);
+ FilePersistReader fileReader = new FilePersistReader(testConfiguration);
+
+ builder.newReadCurrentStream("stdin", reader);
+ builder.addStreamsPersistWriter("writer", fileWriter, 1, "stdin");
+ builder.newReadCurrentStream("reader", fileReader);
+ builder.addStreamsPersistWriter("stdout", writer, 1, "reader");
+
+ builder.start();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
+ builder.stop();
+
+ Mockito.verify(writer).write(testDatum1);
+ Mockito.verify(writer).write(testDatum2);
+ Mockito.verify(writer).write(testDatum3);
+
+ }
+
+ @After
+ public void shutdownTest() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
new file mode 100644
index 0000000..1ffd90a
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
@@ -0,0 +1,60 @@
+package org.apache.streams.file.test;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.file.FileConfiguration;
+import org.apache.streams.file.FilePersistReader;
+import org.apache.streams.file.FilePersistWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestFilePersist {
+
+ private FileConfiguration testConfiguration;
+
+ @Test
+ public void testPersistWriterString() {
+
+ testConfiguration = new FileConfiguration();
+ //testConfiguration.setFile("./test-queue.txt");
+
+ File file = new File( testConfiguration.getFile());
+ if( file.exists() )
+ file.delete();
+
+ FilePersistWriter testPersistWriter = new FilePersistWriter(testConfiguration);
+ testPersistWriter.prepare(testConfiguration);
+
+ String testJsonString = "{\"dummy\":\"true\"}";
+
+ testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+ testPersistWriter.cleanUp();
+
+ FilePersistReader testPersistReader = new FilePersistReader(testConfiguration);
+ try {
+ testPersistReader.prepare(testConfiguration);
+ } catch( Throwable e ) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ StreamsResultSet testResult = testPersistReader.readCurrent();
+
+ testPersistReader.cleanUp();
+
+ assert(testResult.size() == 1);
+
+ }
+
+}