You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/31 01:29:56 UTC
git commit: bumped deps very basic test passing
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-49 [created] 24e427fab
bumped deps
very basic test passing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/24e427fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/24e427fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/24e427fa
Branch: refs/heads/STREAMS-49
Commit: 24e427fabe1a23cafc2a8dd16bfc9891a50a805b
Parents: eec3aa9
Author: sblackmon <sb...@apache.org>
Authored: Thu Oct 30 19:28:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Thu Oct 30 19:28:46 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-persist-kafka/README.md | 21 ++++
streams-contrib/streams-persist-kafka/pom.xml | 57 ++++++-----
.../streams/kafka/KafkaPersistReader.java | 74 ++++++--------
.../streams/kafka/KafkaPersistReaderTask.java | 22 ++--
.../streams/kafka/KafkaPersistWriter.java | 43 ++------
.../streams/kafka/KafkaPersistWriterTask.java | 2 +-
.../streams/kafka/StreamsPartitioner.java | 10 +-
.../src/main/resources/reference.properties | 16 ---
.../streams/kafka/test/TestKafkaCluster.java | 51 ++++++++++
.../streams/kafka/test/TestKafkaPersist.java | 101 +++++++++++++++++++
10 files changed, 261 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/README.md b/streams-contrib/streams-persist-kafka/README.md
new file mode 100644
index 0000000..dce830e
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/README.md
@@ -0,0 +1,21 @@
+streams-persist-kafka
+=====================
+
+Read and write to Kafka
+
+Example reader/writer configuration:
+
+ kafka.metadata.broker.list=localhost:9092
+
+ kafka.zk.connect=localhost:2181
+
+ kafka.topic=topic
+
+ kafka.groupid=group
+
+java -cp jar -Dconfig.file={json/hocon typesafe config} \
+ -Dkafka.metadata.broker.list=localhost:9092 class \
+ -Dkafka.zk.connect=localhost:2181 \
+ -Dkafka.topic=topic \
+ -Dkafka.groupid=group
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index f86a0f3..4e9e375 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -12,8 +12,8 @@
<artifactId>streams-persist-kafka</artifactId>
<properties>
- <scala.version>2.9.2</scala.version>
- <kafka.version>0.8.0</kafka.version>
+ <scala.version>2.10</scala.version>
+ <kafka.version>0.8.1.1</kafka.version>
</properties>
<dependencies>
@@ -43,14 +43,6 @@
<version>${kafka.version}</version>
<exclusions>
<exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
@@ -67,35 +59,46 @@
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
- <version>0.3</version>
+ <version>0.4</version>
<scope>compile</scope>
<exclusions>
<exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index a7810b1..a77c941 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -19,6 +19,7 @@
package org.apache.streams.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
import com.typesafe.config.Config;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
@@ -28,6 +29,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
@@ -65,50 +67,37 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
private ExecutorService executor = Executors.newSingleThreadExecutor();
public KafkaPersistReader() {
- Config config = StreamsConfigurator.config.getConfig("kafka");
- this.config = KafkaConfigurator.detectConfiguration(config);
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
}
- public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
- Config config = StreamsConfigurator.config.getConfig("kafka");
- this.config = KafkaConfigurator.detectConfiguration(config);
- this.persistQueue = persistQueue;
+ public KafkaPersistReader(KafkaConfiguration config) {
+ this.config = config;
}
- public void setConfig(KafkaConfiguration config) {
- this.config = config;
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
}
@Override
public void startStream() {
- Properties props = new Properties();
- props.setProperty("serializer.encoding", "UTF8");
-
- consumerConfig = new ConsumerConfig(props);
-
- consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
-
- Whitelist topics = new Whitelist(config.getTopic());
- VerifiableProperties vprops = new VerifiableProperties(props);
-
- inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
-
for (final KafkaStream stream : inStreams) {
executor.submit(new KafkaPersistReaderTask(this, stream));
}
-
- }
-
- @Override
- public StreamsResultSet readAll() {
- return readCurrent();
}
@Override
public StreamsResultSet readCurrent() {
- return null;
+
+ StreamsResultSet current;
+
+ synchronized( KafkaPersistReader.class ) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ persistQueue.clear();
+ }
+
+ return current;
}
@Override
@@ -126,28 +115,31 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
return !executor.isShutdown() && !executor.isTerminated();
}
- private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+ @Override
+ public void prepare(Object configurationObject) {
+
Properties props = new Properties();
- props.put("zookeeper.connect", a_zookeeper);
- props.put("group.id", a_groupId);
- props.put("zookeeper.session.timeout.ms", "400");
+ props.put("zookeeper.connect", config.getZkconnect());
+ props.put("group.id", "streams");
+ props.put("zookeeper.session.timeout.ms", "1000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
+ props.put("auto.offset.reset", "smallest");
- @Override
- public void prepare(Object configurationObject) {
+ consumerConfig = new ConsumerConfig(props);
+
+ consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+
+ Whitelist topics = new Whitelist(config.getTopic());
+ VerifiableProperties vprops = new VerifiableProperties(props);
+ inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+
+ persistQueue = new ConcurrentLinkedQueue<>();
}
@Override
public void cleanUp() {
consumerConnector.shutdown();
- while( !executor.isTerminated()) {
- try {
- executor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {}
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 83493e0..03fa291 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -18,6 +18,7 @@
package org.apache.streams.kafka;
+import com.google.common.base.Preconditions;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
@@ -44,18 +45,17 @@ public class KafkaPersistReaderTask implements Runnable {
@Override
public void run() {
- MessageAndMetadata<String,String> item;
- while(true) {
-
- ConsumerIterator<String, String> it = stream.iterator();
- while (it.hasNext()) {
- item = it.next();
- reader.persistQueue.add(new StreamsDatum(item.message()));
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
+ Preconditions.checkNotNull(this.stream);
+
+ ConsumerIterator<String, String> it = stream.iterator();
+ while (it.hasNext()) {
+ MessageAndMetadata<String,String> item = it.next();
+ reader.persistQueue.add(new StreamsDatum(item.message()));
}
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) {}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index c5f029a..01db32f 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -36,7 +36,7 @@ import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
@@ -49,18 +49,10 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
private Producer<String, String> producer;
public KafkaPersistWriter() {
- Config config = StreamsConfigurator.config.getConfig("kafka");
- this.config = KafkaConfigurator.detectConfiguration(config);
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
-
- public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
- Config config = StreamsConfigurator.config.getConfig("kafka");
- this.config = KafkaConfigurator.detectConfiguration(config);
- this.persistQueue = persistQueue;
+ this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
}
- public void setConfig(KafkaConfiguration config) {
+ public KafkaPersistWriter(KafkaConfiguration config) {
this.config = config;
}
@@ -71,6 +63,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
props.put("request.required.acks", "1");
+ props.put("auto.create.topics.enable", "true");
ProducerConfig config = new ProducerConfig(props);
@@ -79,27 +72,13 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
new Thread(new KafkaPersistWriterTask(this)).start();
}
- public void stop() {
- producer.close();
- }
-
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
-
- public Queue<StreamsDatum> getPersistQueue() {
- return this.persistQueue;
- }
-
@Override
public void write(StreamsDatum entry) {
try {
String text = mapper.writeValueAsString(entry);
- String hash = GuidUtils.generateGuid(text);
-
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), hash, text);
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), entry.getId(), text);
producer.send(data);
@@ -109,19 +88,15 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R
}
@Override
- public void run() {
- start();
-
- // stop();
- }
-
- @Override
public void prepare(Object configurationObject) {
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
start();
+
}
@Override
public void cleanUp() {
- stop();
+ producer.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
index 5d8ee9e..4aa9707 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
@@ -38,7 +38,7 @@ public class KafkaPersistWriterTask implements Runnable {
public void run() {
while(true) {
- if( writer.getPersistQueue().peek() != null ) {
+ if( writer.persistQueue.peek() != null ) {
try {
StreamsDatum entry = writer.persistQueue.remove();
writer.write(entry);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index ebfff9a..fa38ca2 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -24,17 +24,15 @@ package org.apache.streams.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
-public class StreamsPartitioner implements Partitioner<String> {
+public class StreamsPartitioner implements Partitioner {
+
public StreamsPartitioner (VerifiableProperties props) {
}
- public int partition(String key, int a_numPartitions) {
+ public int partition(Object key, int a_numPartitions) {
int partition = 0;
- int offset = key.lastIndexOf('.');
- if (offset > 0) {
- partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
- }
+ partition = key.hashCode() % a_numPartitions;
return partition;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
index 967264d..164c990 100644
--- a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
+++ b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
@@ -20,22 +20,6 @@ kafka.serializer.class=kafka.serializer.DefaultEncoder
# allow topic level compression
#compressed.topics=
-############################# Async Producer #############################
-# maximum time, in milliseconds, for buffering data on the producer queue
-#queue.buffering.max.ms=
-
-# the maximum size of the blocking queue for buffering on the producer
-#queue.buffering.max.messages=
-
-# Timeout for event enqueue:
-# 0: events will be enqueued immediately or dropped if the queue is full
-# -ve: enqueue will block indefinitely if the queue is full
-# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueue.timeout.ms=
-
-# the number of messages batched at the producer
-#batch.num.messages=
-
kafka.groupid=kafka
kafka.zk.connect=localhost:2181
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
new file mode 100644
index 0000000..167bc8c
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java
@@ -0,0 +1,51 @@
+package org.apache.streams.kafka.test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.TestUtils;
+
+import org.apache.curator.test.TestingServer;
+
+public class TestKafkaCluster {
+ KafkaServerStartable kafkaServer;
+ TestingServer zkServer;
+
+ public TestKafkaCluster() throws Exception {
+ zkServer = new TestingServer();
+ KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
+ kafkaServer = new KafkaServerStartable(config);
+ kafkaServer.startup();
+ }
+
+ private static KafkaConfig getKafkaConfig(final String
+ zkConnectString) {
+ scala.collection.Iterator<Properties> propsI =
+ TestUtils.createBrokerConfigs(1).iterator();
+ assert propsI.hasNext();
+ Properties props = propsI.next();
+ assert props.containsKey("zookeeper.connect");
+ props.put("zookeeper.connect", zkConnectString);
+ return new KafkaConfig(props);
+ }
+
+ public String getKafkaBrokerString() {
+ return String.format("localhost:%d",
+ kafkaServer.serverConfig().port());
+ }
+
+ public String getZkConnectString() {
+ return zkServer.getConnectString();
+ }
+
+ public int getKafkaPort() {
+ return kafkaServer.serverConfig().port();
+ }
+
+ public void stop() throws IOException {
+ kafkaServer.shutdown();
+ zkServer.stop();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
new file mode 100644
index 0000000..1d04f74
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -0,0 +1,101 @@
+package org.apache.streams.kafka.test;
+
+import com.google.common.collect.Lists;
+import kafka.admin.AdminUtils;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.kafka.KafkaConfiguration;
+import org.apache.streams.kafka.KafkaPersistReader;
+import org.apache.streams.kafka.KafkaPersistWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.util.Properties;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestKafkaPersist {
+
+ private TestKafkaCluster testKafkaCluster;
+ private KafkaConfiguration testConfiguration;
+
+ private String testTopic = "testTopic";
+
+ @Before
+ public void prepareTest() {
+
+
+ try {
+ testKafkaCluster = new TestKafkaCluster();
+ } catch (Throwable e ) {
+ e.printStackTrace();
+ }
+
+ testConfiguration = new KafkaConfiguration();
+ testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
+ testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
+ testConfiguration.setTopic(testTopic);
+
+ ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$);
+
+ AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+ }
+
+ @Test
+ public void testPersistWriterString() {
+
+ assert(testConfiguration != null);
+ assert(testKafkaCluster != null);
+
+ KafkaPersistWriter testPersistWriter = new KafkaPersistWriter(testConfiguration);
+ testPersistWriter.prepare(null);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
+ String testJsonString = "{\"dummy\":\"true\"}";
+
+ testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+ testPersistWriter.cleanUp();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
+ KafkaPersistReader testPersistReader = new KafkaPersistReader(testConfiguration);
+ try {
+ testPersistReader.prepare(null);
+ } catch( Throwable e ) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ testPersistReader.startStream();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
+ StreamsResultSet testResult = testPersistReader.readCurrent();
+
+ testPersistReader.cleanUp();
+
+ assert(testResult.size() == 1);
+
+ }
+}