You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/11 02:06:58 UTC

incubator-streams git commit: simple file reader/writer

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-214 [created] d72d8f7e1


simple file reader/writer


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d72d8f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d72d8f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d72d8f7e

Branch: refs/heads/STREAMS-214
Commit: d72d8f7e174ed528007473c9179444bd1eb109c6
Parents: bfa9466
Author: sblackmon <sb...@apache.org>
Authored: Mon Nov 10 19:06:54 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Mon Nov 10 19:06:54 2014 -0600

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-persist-file/README.md  |  12 ++
 streams-contrib/streams-persist-file/pom.xml    | 125 ++++++++++++++
 .../apache/streams/file/FileConfigurator.java   |  51 ++++++
 .../apache/streams/file/FilePersistReader.java  | 163 +++++++++++++++++++
 .../apache/streams/file/FilePersistWriter.java  | 107 ++++++++++++
 .../apache/streams/file/FileConfiguration.json  |  14 ++
 .../apache/streams/file/test/FilePersistIT.java | 112 +++++++++++++
 .../streams/file/test/TestFilePersist.java      |  60 +++++++
 9 files changed, 645 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index fcec297..772ce49 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
         <module>streams-persist-cassandra</module>
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
+        <module>streams-persist-file</module>
         <module>streams-persist-hbase</module>
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/README.md b/streams-contrib/streams-persist-file/README.md
new file mode 100644
index 0000000..30a0d1c
--- /dev/null
+++ b/streams-contrib/streams-persist-file/README.md
@@ -0,0 +1,12 @@
+streams-persist-file
+=====================
+
+Read to / write from File-backed Queue
+
+Example reader/writer configuration:
+
+    file {
+        path = "/tmp/file-queue.txt"
+    }
+    
+Reader will consume lines from Writer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/pom.xml b/streams-contrib/streams-persist-file/pom.xml
new file mode 100644
index 0000000..3eed49d
--- /dev/null
+++ b/streams-contrib/streams-persist-file/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-file</artifactId>
+
+    <properties>
+        <tape.version>1.2.3</tape.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup</groupId>
+            <artifactId>tape</artifactId>
+            <version>${tape.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.file.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
new file mode 100644
index 0000000..7e1bd96
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class FileConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(FileConfigurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static FileConfiguration detectConfiguration(Config kafka) {
+
+        FileConfiguration fileConfiguration = null;
+
+        try {
+            fileConfiguration = mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), FileConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse FileConfiguration");
+        }
+
+        return fileConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
new file mode 100644
index 0000000..75ee8b6
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
+import com.squareup.tape.QueueFile;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class FilePersistReader implements StreamsPersistReader, Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistReader.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private FileConfiguration config;
+
+    private QueueFile queueFile;
+
+    private boolean isStarted = false;
+    private boolean isStopped = false;
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    public FilePersistReader() {
+        this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file")));
+    }
+
+    public FilePersistReader(FileConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+
+    @Override
+    public void startStream() {
+        isStarted = true;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        while (!queueFile.isEmpty()) {
+            try {
+                byte[] bytes = queueFile.peek();
+                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                BufferedReader buf = new BufferedReader(new InputStreamReader(bais));
+                String s = buf.readLine();
+                System.out.println(s);
+                write(new StreamsDatum(s));
+                queueFile.remove();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        StreamsResultSet current;
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+        persistQueue.clear();
+
+        return current;
+    }
+
+    private void write( StreamsDatum entry ) {
+        persistQueue.offer(entry);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger bigInteger) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return isStarted && !isStopped;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
+        try {
+            queueFile = new QueueFile(new File(config.getFile()));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Preconditions.checkNotNull(queueFile);
+
+        this.persistQueue = new ConcurrentLinkedQueue<>();
+
+    }
+
+        @Override
+    public void cleanUp() {
+        try {
+            queueFile.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            queueFile = null;
+            isStopped = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
new file mode 100644
index 0000000..bbae9fb
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.file;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.squareup.tape.QueueFile;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.util.GuidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class FilePersistWriter implements StreamsPersistWriter, Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistWriter.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper;
+
+    private FileConfiguration config;
+
+    private QueueFile queueFile;
+
+    public FilePersistWriter() {
+       this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file")));
+    }
+
+    public FilePersistWriter(FileConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public void write(StreamsDatum entry) {
+
+        String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter");
+
+        Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
+        Preconditions.checkArgument(entry.getDocument() instanceof String);
+        Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
+
+        byte[] item = ((String)entry.getDocument()).getBytes();
+        try {
+            queueFile.add(item);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        mapper = new ObjectMapper();
+
+        try {
+            queueFile = new QueueFile(new File(config.getFile()));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Preconditions.checkNotNull(queueFile);
+
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    }
+
+    @Override
+    public void cleanUp() {
+        try {
+            queueFile.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            queueFile = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
new file mode 100644
index 0000000..5604737
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json
@@ -0,0 +1,14 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.file.FileConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "path": {
+            "type": "string",
+            "description": "A file path to read/write from",
+            "default": "/tmp/streams-file-queue.txt"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
new file mode 100644
index 0000000..e7d50bf
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java
@@ -0,0 +1,112 @@
+package org.apache.streams.file.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.console.ConsolePersistReader;
+import org.apache.streams.console.ConsolePersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.file.FileConfiguration;
+import org.apache.streams.file.FilePersistReader;
+import org.apache.streams.file.FilePersistWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class FilePersistIT {
+
+    private FileConfiguration testConfiguration;
+
+    ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class);
+    ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class);
+
+    StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}");
+    StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}");
+    StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}");
+
+    @Before
+    public void prepareTest() {
+
+        testConfiguration = new FileConfiguration();
+        //testConfiguration.setFile("./test-queue.txt");
+
+        File file = new File( testConfiguration.getFile());
+        if( file.exists() )
+            file.delete();
+
+        PowerMockito.when(reader.readCurrent())
+                .thenReturn(
+                        new StreamsResultSet(Queues.newConcurrentLinkedQueue(
+                                Lists.newArrayList(testDatum1, testDatum2, testDatum3)))
+                ).thenReturn(null);
+    }
+
+    @Test
+    public void testPersistStream() {
+
+        assert(testConfiguration != null);
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000);
+
+        StreamBuilder builder = new LocalStreamBuilder(1, streamConfig);
+
+        FilePersistWriter fileWriter = new FilePersistWriter(testConfiguration);
+        FilePersistReader fileReader = new FilePersistReader(testConfiguration);
+
+        builder.newReadCurrentStream("stdin", reader);
+        builder.addStreamsPersistWriter("writer", fileWriter, 1, "stdin");
+        builder.newReadCurrentStream("reader", fileReader);
+        builder.addStreamsPersistWriter("stdout", writer, 1, "reader");
+
+        builder.start();
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
+        builder.stop();
+
+        Mockito.verify(writer).write(testDatum1);
+        Mockito.verify(writer).write(testDatum2);
+        Mockito.verify(writer).write(testDatum3);
+
+    }
+
+    @After
+    public void shutdownTest() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
new file mode 100644
index 0000000..1ffd90a
--- /dev/null
+++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java
@@ -0,0 +1,60 @@
+package org.apache.streams.file.test;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.file.FileConfiguration;
+import org.apache.streams.file.FilePersistReader;
+import org.apache.streams.file.FilePersistWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestFilePersist {
+
+    private FileConfiguration testConfiguration;
+
+    @Test
+    public void testPersistWriterString() {
+
+        testConfiguration = new FileConfiguration();
+        //testConfiguration.setFile("./test-queue.txt");
+
+        File file = new File( testConfiguration.getFile());
+        if( file.exists() )
+            file.delete();
+
+        FilePersistWriter testPersistWriter = new FilePersistWriter(testConfiguration);
+        testPersistWriter.prepare(testConfiguration);
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+        testPersistWriter.cleanUp();
+
+        FilePersistReader testPersistReader = new FilePersistReader(testConfiguration);
+        try {
+            testPersistReader.prepare(testConfiguration);
+        } catch( Throwable e ) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
+        StreamsResultSet testResult = testPersistReader.readCurrent();
+
+        testPersistReader.cleanUp();
+
+        assert(testResult.size() == 1);
+
+    }
+
+}