You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 18:01:14 UTC

[1/4] incubator-streams git commit: use a group

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-49 3026b8319 -> 0ca5bc90d


use a group


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5f71bd7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5f71bd7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5f71bd7d

Branch: refs/heads/STREAMS-49
Commit: 5f71bd7d1c1572fb5bb1d05702a9f70bad89b2df
Parents: 3026b83
Author: sblackmon <sb...@apache.org>
Authored: Sat Nov 8 12:33:16 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Sat Nov 8 12:33:16 2014 -0800

----------------------------------------------------------------------
 .../test/java/org/apache/streams/kafka/test/KafkaPersistIT.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f71bd7d/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
index 2817fca..5e2cab9 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
@@ -49,6 +49,7 @@ public class KafkaPersistIT {
     private KafkaConfiguration testConfiguration;
 
     private String testTopic = "testTopic";
+    private String testGroup = "testGroup";
 
     ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class);
     ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class);
@@ -68,6 +69,7 @@ public class KafkaPersistIT {
         testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
         testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
         testConfiguration.setTopic(testTopic);
+        testConfiguration.setGroupId(testGroup);
 
         ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$);
 
@@ -96,7 +98,7 @@ public class KafkaPersistIT {
 
         builder.newReadCurrentStream("stdin", reader);
         builder.addStreamsPersistWriter("writer", kafkaWriter, 1, "stdin");
-        builder.newPerpetualStream("reader", kafkaReader);
+        builder.newReadCurrentStream("reader", kafkaReader);
         builder.addStreamsPersistWriter("stdout", writer, 1, "reader");
 
         builder.start();


[4/4] incubator-streams git commit: merge

Posted by sb...@apache.org.
merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0ca5bc90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0ca5bc90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0ca5bc90

Branch: refs/heads/STREAMS-49
Commit: 0ca5bc90dafd77cd8a7bfaf5b249fb032915c061
Parents: c503011
Author: sblackmon <sb...@apache.org>
Authored: Sun Nov 23 11:00:41 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Sun Nov 23 11:00:41 2014 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/streams/kafka/KafkaPersistWriter.java | 6 ++----
 .../main/java/org/apache/streams/kafka/StreamsPartitioner.java | 2 +-
 .../java/org/apache/streams/kafka/test/TestKafkaPersist.java   | 3 ---
 3 files changed, 3 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ca5bc90/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index e832029..7545456 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -96,11 +96,9 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
 
         ProducerConfig config = new ProducerConfig(props);
 
-        producer = new Producer(config);
+        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
 
-    @Override
-    public void prepare(Object configurationObject) {
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        producer = new Producer(config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ca5bc90/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index af0a8b9..a229e1b 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -24,7 +24,7 @@ package org.apache.streams.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-public class StreamsPartitioner implements Partitioner<> {
+public class StreamsPartitioner implements Partitioner {
     public StreamsPartitioner (VerifiableProperties props) {
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ca5bc90/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
index 7fac8ff..5c0d191 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -93,11 +93,8 @@ public class TestKafkaPersist {
             Assert.fail();
         }
 
-<<<<<<< Updated upstream
-=======
         testPersistReader.startStream();
 
->>>>>>> Stashed changes
         try {
             Thread.sleep(1000);
         } catch (InterruptedException ie) {


[3/4] incubator-streams git commit: merge - tests still aren't passing

Posted by sb...@apache.org.
merge - tests still aren't passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c503011c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c503011c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c503011c

Branch: refs/heads/STREAMS-49
Commit: c503011c4465a04f01e321d5193968d1e96e0b12
Parents: 17374fb
Author: sblackmon <sb...@apache.org>
Authored: Sun Nov 23 10:57:20 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Sun Nov 23 10:57:20 2014 -0600

----------------------------------------------------------------------
 streams-contrib/streams-persist-kafka/pom.xml   | 24 ++++++++
 .../streams/kafka/KafkaPersistReaderTask.java   | 62 ++++++++++++++++++++
 .../streams/kafka/KafkaPersistWriter.java       | 11 +++-
 .../streams/kafka/StreamsPartitioner.java       |  4 +-
 .../streams/kafka/test/TestKafkaPersist.java    | 14 +++++
 5 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index 6d98c36..f56885c 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -109,6 +109,30 @@
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
new file mode 100644
index 0000000..03fa291
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.kafka;
+
+import com.google.common.base.Preconditions;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class KafkaPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
+
+    private KafkaPersistReader reader;
+    private KafkaStream<String,String> stream;
+
+    public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
+        this.reader = reader;
+        this.stream = stream;
+    }
+
+
+
+    @Override
+    public void run() {
+
+        Preconditions.checkNotNull(this.stream);
+
+        ConsumerIterator<String, String> it = stream.iterator();
+        while (it.hasNext()) {
+            MessageAndMetadata<String,String> item = it.next();
+            reader.persistQueue.add(new StreamsDatum(item.message()));
+        }
+        try {
+            Thread.sleep(new Random().nextInt(100));
+        } catch (InterruptedException e) {}
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 7e300a8..e832029 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -60,12 +60,18 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
     public KafkaPersistWriter(KafkaConfiguration config) {
         this.config = config;
     }
+    
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
 
     @Override
     public void write(StreamsDatum entry) {
 
+        Preconditions.checkArgument(entry.getDocument() instanceof String);
+        
         String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("kafkawriter");
-
+        
         Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
         Preconditions.checkArgument(entry.getDocument() instanceof String);
         Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
@@ -92,8 +98,9 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
 
         producer = new Producer(config);
 
+    @Override
+    public void prepare(Object configurationObject) {
         this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index b1f2a28..af0a8b9 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -24,15 +24,13 @@ package org.apache.streams.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-public class StreamsPartitioner implements Partitioner {
-
+public class StreamsPartitioner implements Partitioner<> {
     public StreamsPartitioner (VerifiableProperties props) {
 
     }
 
     public int partition(Object key, int a_numPartitions) {
         int partition = 0;
-
         return partition;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
index 572b87e..7fac8ff 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -93,6 +93,11 @@ public class TestKafkaPersist {
             Assert.fail();
         }
 
+<<<<<<< Updated upstream
+=======
+        testPersistReader.startStream();
+
+>>>>>>> Stashed changes
         try {
             Thread.sleep(1000);
         } catch (InterruptedException ie) {
@@ -105,6 +110,14 @@ public class TestKafkaPersist {
 
         assert(testResult.size() == 1);
 
+        StreamsDatum datum = testResult.iterator().next();
+
+        assert(datum.getDocument() instanceof String);
+
+        String datumString = (String) datum.getDocument();
+
+        assert(datumString.contains("dummy") && datumString.contains("true"));
+
     }
 
     @After
@@ -116,5 +129,6 @@ public class TestKafkaPersist {
         } finally {
             testKafkaCluster = null;
         }
+        
     }
 }


[2/4] incubator-streams git commit: simplifying refactoring to kafka persist - NOTE neither test is passing in this commit

Posted by sb...@apache.org.
simplifying refactoring to kafka persist - NOTE neither test is passing in this commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/17374fb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/17374fb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/17374fb9

Branch: refs/heads/STREAMS-49
Commit: 17374fb9496efb8083e287b224733e79f307d384
Parents: 5f71bd7
Author: sblackmon <sb...@apache.org>
Authored: Mon Nov 10 18:41:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Mon Nov 10 18:41:18 2014 -0600

----------------------------------------------------------------------
 .../streams/kafka/KafkaPersistReader.java       | 53 +++++++++------
 .../streams/kafka/KafkaPersistReaderTask.java   | 70 --------------------
 .../streams/kafka/KafkaPersistWriter.java       | 48 ++++++++------
 .../streams/kafka/KafkaPersistWriterTask.java   | 56 ----------------
 .../streams/kafka/test/KafkaPersistIT.java      | 33 +++++++++
 .../streams/kafka/test/TestKafkaPersist.java    | 35 +++++++---
 6 files changed, 120 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index a77c941..6c30be7 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -19,17 +19,18 @@
 package org.apache.streams.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.Whitelist;
 import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
 import kafka.serializer.StringDecoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
@@ -37,13 +38,14 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.streams.kafka.KafkaConfiguration;
-
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -62,7 +64,10 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
     private ConsumerConfig consumerConfig;
     private ConsumerConnector consumerConnector;
 
-    public List<KafkaStream<String, String>> inStreams;
+    public KafkaStream<String, String> stream;
+
+    private boolean isStarted = false;
+    private boolean isStopped = false;
 
     private ExecutorService executor = Executors.newSingleThreadExecutor();
 
@@ -81,25 +86,29 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
 
     @Override
     public void startStream() {
-
-        for (final KafkaStream stream : inStreams) {
-            executor.submit(new KafkaPersistReaderTask(this, stream));
-        }
+        isStarted = true;
     }
 
     @Override
     public StreamsResultSet readCurrent() {
 
-        StreamsResultSet current;
-
-        synchronized( KafkaPersistReader.class ) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
-            persistQueue.clear();
+        ConsumerIterator it = stream.iterator();
+        while (it.hasNext()) {
+            MessageAndMetadata item = it.next();
+            write(new StreamsDatum((String)item.message(), (String)item.key()));
         }
 
+        StreamsResultSet current;
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+        persistQueue.clear();
+
         return current;
     }
 
+    private void write( StreamsDatum entry ) {
+        persistQueue.offer(entry);
+    }
+
     @Override
     public StreamsResultSet readNew(BigInteger bigInteger) {
         return null;
@@ -112,28 +121,31 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
 
     @Override
     public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
+        return isStarted && !isStopped;
     }
 
     @Override
     public void prepare(Object configurationObject) {
 
         Properties props = new Properties();
+
         props.put("zookeeper.connect", config.getZkconnect());
         props.put("group.id", "streams");
         props.put("zookeeper.session.timeout.ms", "1000");
         props.put("zookeeper.sync.time.ms", "200");
-        props.put("auto.commit.interval.ms", "1000");
+        props.put("auto.commit.interval.ms", "500");
         props.put("auto.offset.reset", "smallest");
 
+        VerifiableProperties vprops = new VerifiableProperties(props);
+
         consumerConfig = new ConsumerConfig(props);
 
         consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
-        Whitelist topics = new Whitelist(config.getTopic());
-        VerifiableProperties vprops = new VerifiableProperties(props);
-
-        inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+        Map topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(config.getTopic(), new Integer(1));
+        Map<String, List<KafkaStream<String,String>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(vprops), new StringDecoder(vprops));
+        stream = consumerMap.get(config.getTopic()).get(0);
 
         persistQueue = new ConcurrentLinkedQueue<>();
     }
@@ -141,5 +153,6 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
     @Override
     public void cleanUp() {
         consumerConnector.shutdown();
+        isStopped = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
deleted file mode 100644
index 3c71398..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class KafkaPersistReaderTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
-
-    private KafkaPersistReader reader;
-    private KafkaStream<String,Object> stream;
-
-    public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,Object> stream) {
-        this.reader = reader;
-        this.stream = stream;
-    }
-
-    @Override
-    public void run() {
-
-        Preconditions.checkNotNull(this.stream);
-
-        ConsumerIterator<String, Object> it = stream.iterator();
-        while (it.hasNext()) {
-            MessageAndMetadata<String,Object> item = it.next();
-            write(new StreamsDatum(item.message()));
-        }
-        try {
-            Thread.sleep(new Random().nextInt(100));
-        } catch (InterruptedException e) {}
-
-    }
-
-    private void write( StreamsDatum entry ) {
-        boolean success;
-        do {
-            synchronized( KafkaPersistReader.class ) {
-                success = reader.persistQueue.offer(entry);
-            }
-            Thread.yield();
-        }
-        while( !success );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 2937b7a..7e300a8 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -20,6 +20,8 @@ package org.apache.streams.kafka;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
@@ -31,6 +33,9 @@ import org.apache.streams.util.GuidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Properties;
 import java.util.Queue;
@@ -42,11 +47,11 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper;
 
     private KafkaConfiguration config;
 
-    private Producer<String, Object> producer;
+    private Producer<String, String> producer;
 
     public KafkaPersistWriter() {
        this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
@@ -56,37 +61,38 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
         this.config = config;
     }
 
-    public void start() {
-        Properties props = new Properties();
-
-        props.put("metadata.broker.list", config.getBrokerlist());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
-        props.put("request.required.acks", "1");
-        props.put("auto.create.topics.enable", "true");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        producer = new Producer<String, Object>(config);
-
-        new Thread(new KafkaPersistWriterTask(this)).start();
-    }
-
     @Override
     public void write(StreamsDatum entry) {
 
         String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("kafkawriter");
 
-        KeyedMessage<String, Object> data = new KeyedMessage<>(config.getTopic(), key, entry.getDocument());
+        Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
+        Preconditions.checkArgument(entry.getDocument() instanceof String);
+        Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
+
+        KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), key, (String)entry.getDocument());
 
         producer.send(data);
     }
 
     @Override
     public void prepare(Object configurationObject) {
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
 
-        start();
+        mapper = new ObjectMapper();
+
+        Properties props = new Properties();
+
+        props.put("metadata.broker.list", config.getBrokerlist());
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
+        props.put("request.required.acks", "1");
+        props.put("auto.create.topics.enable", "true");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        producer = new Producer(config);
+
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
deleted file mode 100644
index 4aa9707..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class KafkaPersistWriterTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
-
-    private KafkaPersistWriter writer;
-
-    public KafkaPersistWriterTask(KafkaPersistWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
index 5e2cab9..4dc339c 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
@@ -5,6 +5,11 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import javassist.bytecode.stackmap.TypeData;
 import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
 import kafka.utils.ZKStringSerializer$;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.streams.console.ConsolePersistReader;
@@ -16,6 +21,7 @@ import org.apache.streams.kafka.KafkaConfiguration;
 import org.apache.streams.kafka.KafkaPersistReader;
 import org.apache.streams.kafka.KafkaPersistWriter;
 import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -24,6 +30,8 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -37,6 +45,7 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
 
@@ -75,6 +84,12 @@ public class KafkaPersistIT {
 
         AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
 
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
         PowerMockito.when(reader.readCurrent())
                 .thenReturn(
                         new StreamsResultSet(Queues.newConcurrentLinkedQueue(
@@ -103,9 +118,27 @@ public class KafkaPersistIT {
 
         builder.start();
 
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
+
         builder.stop();
 
         Mockito.verify(writer).write(testDatum);
 
     }
+
+    @After
+    public void shutdownTest() {
+        try {
+            testKafkaCluster.stop();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            testKafkaCluster = null;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
index 1d04f74..572b87e 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -10,12 +10,14 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.kafka.KafkaConfiguration;
 import org.apache.streams.kafka.KafkaPersistReader;
 import org.apache.streams.kafka.KafkaPersistWriter;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import scala.collection.JavaConversions;
 import scala.collection.Seq;
 
+import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -31,21 +33,29 @@ public class TestKafkaPersist {
     @Before
     public void prepareTest() {
 
-
         try {
             testKafkaCluster = new TestKafkaCluster();
         } catch (Throwable e ) {
             e.printStackTrace();
         }
 
+        String zkConnect = testKafkaCluster.getZkConnectString().replace("127.0.0.1", "localhost");
+        String kafkaBroker = testKafkaCluster.getKafkaBrokerString().replace("127.0.0.1", "localhost");
+
         testConfiguration = new KafkaConfiguration();
-        testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
-        testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
+        testConfiguration.setBrokerlist(kafkaBroker);
+        testConfiguration.setZkconnect(zkConnect);
         testConfiguration.setTopic(testTopic);
 
-        ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$);
+        ZkClient zkClient = new ZkClient(testConfiguration.getZkconnect(), 1000, 1000, ZKStringSerializer$.MODULE$);
 
         AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            //Handle exception
+        }
     }
 
     @Test
@@ -55,7 +65,7 @@ public class TestKafkaPersist {
         assert(testKafkaCluster != null);
 
         KafkaPersistWriter testPersistWriter = new KafkaPersistWriter(testConfiguration);
-        testPersistWriter.prepare(null);
+        testPersistWriter.prepare(testConfiguration);
 
         try {
             Thread.sleep(1000);
@@ -77,14 +87,12 @@ public class TestKafkaPersist {
 
         KafkaPersistReader testPersistReader = new KafkaPersistReader(testConfiguration);
         try {
-            testPersistReader.prepare(null);
+            testPersistReader.prepare(testConfiguration);
         } catch( Throwable e ) {
             e.printStackTrace();
             Assert.fail();
         }
 
-        testPersistReader.startStream();
-
         try {
             Thread.sleep(1000);
         } catch (InterruptedException ie) {
@@ -98,4 +106,15 @@ public class TestKafkaPersist {
         assert(testResult.size() == 1);
 
     }
+
+    @After
+    public void shutdownTest() {
+        try {
+            testKafkaCluster.stop();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            testKafkaCluster = null;
+        }
+    }
 }