You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/31 01:29:56 UTC

git commit: bumped deps very basic test passing

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-49 [created] 24e427fab


bumped deps
very basic test passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/24e427fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/24e427fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/24e427fa

Branch: refs/heads/STREAMS-49
Commit: 24e427fabe1a23cafc2a8dd16bfc9891a50a805b
Parents: eec3aa9
Author: sblackmon <sb...@apache.org>
Authored: Thu Oct 30 19:28:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Thu Oct 30 19:28:46 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-kafka/README.md |  21 ++++
 streams-contrib/streams-persist-kafka/pom.xml   |  57 ++++++-----
 .../streams/kafka/KafkaPersistReader.java       |  74 ++++++--------
 .../streams/kafka/KafkaPersistReaderTask.java   |  22 ++--
 .../streams/kafka/KafkaPersistWriter.java       |  43 ++------
 .../streams/kafka/KafkaPersistWriterTask.java   |   2 +-
 .../streams/kafka/StreamsPartitioner.java       |  10 +-
 .../src/main/resources/reference.properties     |  16 ---
 .../streams/kafka/test/TestKafkaCluster.java    |  51 ++++++++++
 .../streams/kafka/test/TestKafkaPersist.java    | 101 +++++++++++++++++++
 10 files changed, 261 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/README.md b/streams-contrib/streams-persist-kafka/README.md
new file mode 100644
index 0000000..dce830e
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/README.md
@@ -0,0 +1,21 @@
+streams-persist-kafka
+=====================
+
+Read and write to Kafka
+
+Example reader/writer configuration:
+
+    kafka.metadata.broker.list=localhost:9092
+    
+    kafka.zk.connect=localhost:2181
+    
+    kafka.topic=topic
+    
+    kafka.groupid=group
+    
+java    -cp jar -Dconfig.file={json/hocon typesafe config} \
+        -Dkafka.metadata.broker.list=localhost:9092 class \
+        -Dkafka.zk.connect=localhost:2181 \
+        -Dkafka.topic=topic \
+        -Dkafka.groupid=group
+    

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index f86a0f3..4e9e375 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -12,8 +12,8 @@
     <artifactId>streams-persist-kafka</artifactId>
 
     <properties>
-        <scala.version>2.9.2</scala.version>
-        <kafka.version>0.8.0</kafka.version>
+        <scala.version>2.10</scala.version>
+        <kafka.version>0.8.1.1</kafka.version>
     </properties>
 
     <dependencies>
@@ -43,14 +43,6 @@
             <version>${kafka.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>com.sun.jdmk</groupId>
                     <artifactId>jmxtools</artifactId>
                 </exclusion>
@@ -67,35 +59,46 @@
         <dependency>
             <groupId>com.101tec</groupId>
             <artifactId>zkclient</artifactId>
-            <version>0.3</version>
+            <version>0.4</version>
             <scope>compile</scope>
             <exclusions>
                 <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
+                <groupId>org.apache.zookeeper</groupId>
+                <artifactId>zookeeper</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.version}</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
         <plugins>
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index a7810b1..a77c941 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -19,6 +19,7 @@
 package org.apache.streams.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
 import com.typesafe.config.Config;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
@@ -28,6 +29,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
@@ -65,50 +67,37 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
     private ExecutorService executor = Executors.newSingleThreadExecutor();
 
     public KafkaPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
     }
 
-    public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue = persistQueue;
+    public KafkaPersistReader(KafkaConfiguration config) {
+        this.config = config;
     }
 
-    public void setConfig(KafkaConfiguration config) {
-        this.config = config;
+    @Override
+    public StreamsResultSet readAll() {
+        return readCurrent();
     }
 
     @Override
     public void startStream() {
 
-        Properties props = new Properties();
-        props.setProperty("serializer.encoding", "UTF8");
-
-        consumerConfig = new ConsumerConfig(props);
-
-        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
-
-        Whitelist topics = new Whitelist(config.getTopic());
-        VerifiableProperties vprops = new VerifiableProperties(props);
-
-        inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
-
         for (final KafkaStream stream : inStreams) {
             executor.submit(new KafkaPersistReaderTask(this, stream));
         }
-
-    }
-
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
     }
 
     @Override
     public StreamsResultSet readCurrent() {
-        return null;
+
+        StreamsResultSet current;
+
+        synchronized( KafkaPersistReader.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            persistQueue.clear();
+        }
+
+        return current;
     }
 
     @Override
@@ -126,28 +115,31 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
         return !executor.isShutdown() && !executor.isTerminated();
     }
 
-    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+    @Override
+    public void prepare(Object configurationObject) {
+
         Properties props = new Properties();
-        props.put("zookeeper.connect", a_zookeeper);
-        props.put("group.id", a_groupId);
-        props.put("zookeeper.session.timeout.ms", "400");
+        props.put("zookeeper.connect", config.getZkconnect());
+        props.put("group.id", "streams");
+        props.put("zookeeper.session.timeout.ms", "1000");
         props.put("zookeeper.sync.time.ms", "200");
         props.put("auto.commit.interval.ms", "1000");
-        return new ConsumerConfig(props);
-    }
+        props.put("auto.offset.reset", "smallest");
 
-    @Override
-    public void prepare(Object configurationObject) {
+        consumerConfig = new ConsumerConfig(props);
+
+        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+
+        Whitelist topics = new Whitelist(config.getTopic());
+        VerifiableProperties vprops = new VerifiableProperties(props);
 
+        inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+
+        persistQueue = new ConcurrentLinkedQueue<>();
     }
 
     @Override
     public void cleanUp() {
         consumerConnector.shutdown();
-        while( !executor.isTerminated()) {
-            try {
-                executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {}
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 83493e0..03fa291 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.kafka;
 
+import com.google.common.base.Preconditions;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.message.MessageAndMetadata;
@@ -44,18 +45,17 @@ public class KafkaPersistReaderTask implements Runnable {
     @Override
     public void run() {
 
-        MessageAndMetadata<String,String> item;
-        while(true) {
-
-            ConsumerIterator<String, String> it = stream.iterator();
-            while (it.hasNext()) {
-                item = it.next();
-                reader.persistQueue.add(new StreamsDatum(item.message()));
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
+        Preconditions.checkNotNull(this.stream);
+
+        ConsumerIterator<String, String> it = stream.iterator();
+        while (it.hasNext()) {
+            MessageAndMetadata<String,String> item = it.next();
+            reader.persistQueue.add(new StreamsDatum(item.message()));
         }
+        try {
+            Thread.sleep(new Random().nextInt(100));
+        } catch (InterruptedException e) {}
+
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index c5f029a..01db32f 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -36,7 +36,7 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
 
@@ -49,18 +49,10 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
     private Producer<String, String> producer;
 
     public KafkaPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue = persistQueue;
+       this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
     }
 
-    public void setConfig(KafkaConfiguration config) {
+    public KafkaPersistWriter(KafkaConfiguration config) {
         this.config = config;
     }
 
@@ -71,6 +63,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
         props.put("request.required.acks", "1");
+        props.put("auto.create.topics.enable", "true");
 
         ProducerConfig config = new ProducerConfig(props);
 
@@ -79,27 +72,13 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
         new Thread(new KafkaPersistWriterTask(this)).start();
     }
 
-    public void stop() {
-        producer.close();
-    }
-
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
-
-    public Queue<StreamsDatum> getPersistQueue() {
-        return this.persistQueue;
-    }
-
     @Override
     public void write(StreamsDatum entry) {
 
         try {
             String text = mapper.writeValueAsString(entry);
 
-            String hash = GuidUtils.generateGuid(text);
-
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), hash, text);
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), entry.getId(), text);
 
             producer.send(data);
 
@@ -109,19 +88,15 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
     }
 
     @Override
-    public void run() {
-        start();
-
-        // stop();
-    }
-
-    @Override
     public void prepare(Object configurationObject) {
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+
         start();
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        producer.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
index 5d8ee9e..4aa9707 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
@@ -38,7 +38,7 @@ public class KafkaPersistWriterTask implements Runnable {
     public void run() {
 
         while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
+            if( writer.persistQueue.peek() != null ) {
                 try {
                     StreamsDatum entry = writer.persistQueue.remove();
                     writer.write(entry);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index ebfff9a..fa38ca2 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -24,17 +24,15 @@ package org.apache.streams.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-public class StreamsPartitioner implements Partitioner<String> {
+public class StreamsPartitioner implements Partitioner {
+
     public StreamsPartitioner (VerifiableProperties props) {
 
     }
 
-    public int partition(String key, int a_numPartitions) {
+    public int partition(Object key, int a_numPartitions) {
         int partition = 0;
-        int offset = key.lastIndexOf('.');
-        if (offset > 0) {
-            partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
-        }
+        partition = key.hashCode() % a_numPartitions;
         return partition;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
index 967264d..164c990 100644
--- a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
+++ b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
@@ -20,22 +20,6 @@ kafka.serializer.class=kafka.serializer.DefaultEncoder
 # allow topic level compression
 #compressed.topics=
 
-############################# Async Producer #############################
-# maximum time, in milliseconds, for buffering data on the producer queue
-#queue.buffering.max.ms=
-
-# the maximum size of the blocking queue for buffering on the producer
-#queue.buffering.max.messages=
-
-# Timeout for event enqueue:
-# 0: events will be enqueued immediately or dropped if the queue is full
-# -ve: enqueue will block indefinitely if the queue is full
-# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueue.timeout.ms=
-
-# the number of messages batched at the producer
-#batch.num.messages=
-
 kafka.groupid=kafka
 
 kafka.zk.connect=localhost:2181

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
new file mode 100644
index 0000000..167bc8c
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
@@ -0,0 +1,51 @@
+package org.apache.streams.kafka.test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.TestUtils;
+
+import org.apache.curator.test.TestingServer;
+
+public class TestKafkaCluster {
+    KafkaServerStartable kafkaServer;
+    TestingServer zkServer;
+
+    public TestKafkaCluster() throws Exception {
+        zkServer = new TestingServer();
+        KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
+        kafkaServer = new KafkaServerStartable(config);
+        kafkaServer.startup();
+    }
+
+    private static KafkaConfig getKafkaConfig(final String
+                                                      zkConnectString) {
+        scala.collection.Iterator<Properties> propsI =
+                TestUtils.createBrokerConfigs(1).iterator();
+        assert propsI.hasNext();
+        Properties props = propsI.next();
+        assert props.containsKey("zookeeper.connect");
+        props.put("zookeeper.connect", zkConnectString);
+        return new KafkaConfig(props);
+    }
+
+    public String getKafkaBrokerString() {
+        return String.format("localhost:%d",
+                kafkaServer.serverConfig().port());
+    }
+
+    public String getZkConnectString() {
+        return zkServer.getConnectString();
+    }
+
+    public int getKafkaPort() {
+        return kafkaServer.serverConfig().port();
+    }
+
+    public void stop() throws IOException {
+        kafkaServer.shutdown();
+        zkServer.stop();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
new file mode 100644
index 0000000..1d04f74
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -0,0 +1,101 @@
+package org.apache.streams.kafka.test;
+
+import com.google.common.collect.Lists;
+import kafka.admin.AdminUtils;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.kafka.KafkaConfiguration;
+import org.apache.streams.kafka.KafkaPersistReader;
+import org.apache.streams.kafka.KafkaPersistWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.util.Properties;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestKafkaPersist {
+
+    private TestKafkaCluster testKafkaCluster;
+    private KafkaConfiguration testConfiguration;
+
+    private String testTopic = "testTopic";
+
+    @Before
+    public void prepareTest() {
+
+
+        try {
+            testKafkaCluster = new TestKafkaCluster();
+        } catch (Throwable e ) {
+            e.printStackTrace();
+        }
+
+        testConfiguration = new KafkaConfiguration();
+        testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
+        testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
+        testConfiguration.setTopic(testTopic);
+
+        ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$);
+
+        AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+    }
+
+    @Test
+    public void testPersistWriterString() {
+
+        assert(testConfiguration != null);
+        assert(testKafkaCluster != null);
+
+        KafkaPersistWriter testPersistWriter = new KafkaPersistWriter(testConfiguration);
+        testPersistWriter.prepare(null);
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+        testPersistWriter.cleanUp();
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
+        KafkaPersistReader testPersistReader = new KafkaPersistReader(testConfiguration);
+        try {
+            testPersistReader.prepare(null);
+        } catch( Throwable e ) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
+        testPersistReader.startStream();
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
+        StreamsResultSet testResult = testPersistReader.readCurrent();
+
+        testPersistReader.cleanUp();
+
+        assert(testResult.size() == 1);
+
+    }
+}