You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 18:01:15 UTC
[2/4] incubator-streams git commit: simplifying refactoring to kafka
persist - NOTE neither test is passing in this commit
simplifying refactoring to kafka persist - NOTE neither test is passing in this commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/17374fb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/17374fb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/17374fb9
Branch: refs/heads/STREAMS-49
Commit: 17374fb9496efb8083e287b224733e79f307d384
Parents: 5f71bd7
Author: sblackmon <sb...@apache.org>
Authored: Mon Nov 10 18:41:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Mon Nov 10 18:41:18 2014 -0600
----------------------------------------------------------------------
.../streams/kafka/KafkaPersistReader.java | 53 +++++++++------
.../streams/kafka/KafkaPersistReaderTask.java | 70 --------------------
.../streams/kafka/KafkaPersistWriter.java | 48 ++++++++------
.../streams/kafka/KafkaPersistWriterTask.java | 56 ----------------
.../streams/kafka/test/KafkaPersistIT.java | 33 +++++++++
.../streams/kafka/test/TestKafkaPersist.java | 35 +++++++---
6 files changed, 120 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index a77c941..6c30be7 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -19,17 +19,18 @@
package org.apache.streams.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
@@ -37,13 +38,14 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.streams.kafka.KafkaConfiguration;
-
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Queue;
+import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -62,7 +64,10 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
private ConsumerConfig consumerConfig;
private ConsumerConnector consumerConnector;
- public List<KafkaStream<String, String>> inStreams;
+ public KafkaStream<String, String> stream;
+
+ private boolean isStarted = false;
+ private boolean isStopped = false;
private ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -81,25 +86,29 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
@Override
public void startStream() {
-
- for (final KafkaStream stream : inStreams) {
- executor.submit(new KafkaPersistReaderTask(this, stream));
- }
+ isStarted = true;
}
@Override
public StreamsResultSet readCurrent() {
- StreamsResultSet current;
-
- synchronized( KafkaPersistReader.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
- persistQueue.clear();
+ ConsumerIterator it = stream.iterator();
+ while (it.hasNext()) {
+ MessageAndMetadata item = it.next();
+ write(new StreamsDatum((String)item.message(), (String)item.key()));
}
+ StreamsResultSet current;
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ persistQueue.clear();
+
return current;
}
+ private void write( StreamsDatum entry ) {
+ persistQueue.offer(entry);
+ }
+
@Override
public StreamsResultSet readNew(BigInteger bigInteger) {
return null;
@@ -112,28 +121,31 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
@Override
public boolean isRunning() {
- return !executor.isShutdown() && !executor.isTerminated();
+ return isStarted && !isStopped;
}
@Override
public void prepare(Object configurationObject) {
Properties props = new Properties();
+
props.put("zookeeper.connect", config.getZkconnect());
props.put("group.id", "streams");
props.put("zookeeper.session.timeout.ms", "1000");
props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.commit.interval.ms", "500");
props.put("auto.offset.reset", "smallest");
+ VerifiableProperties vprops = new VerifiableProperties(props);
+
consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- Whitelist topics = new Whitelist(config.getTopic());
- VerifiableProperties vprops = new VerifiableProperties(props);
-
- inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+ Map topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(config.getTopic(), new Integer(1));
+ Map<String, List<KafkaStream<String,String>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(vprops), new StringDecoder(vprops));
+ stream = consumerMap.get(config.getTopic()).get(0);
persistQueue = new ConcurrentLinkedQueue<>();
}
@@ -141,5 +153,6 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
@Override
public void cleanUp() {
consumerConnector.shutdown();
+ isStopped = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
deleted file mode 100644
index 3c71398..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class KafkaPersistReaderTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
-
- private KafkaPersistReader reader;
- private KafkaStream<String,Object> stream;
-
- public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,Object> stream) {
- this.reader = reader;
- this.stream = stream;
- }
-
- @Override
- public void run() {
-
- Preconditions.checkNotNull(this.stream);
-
- ConsumerIterator<String, Object> it = stream.iterator();
- while (it.hasNext()) {
- MessageAndMetadata<String,Object> item = it.next();
- write(new StreamsDatum(item.message()));
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
-
- }
-
- private void write( StreamsDatum entry ) {
- boolean success;
- do {
- synchronized( KafkaPersistReader.class ) {
- success = reader.persistQueue.offer(entry);
- }
- Thread.yield();
- }
- while( !success );
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 2937b7a..7e300a8 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -20,6 +20,8 @@ package org.apache.streams.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.typesafe.config.Config;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
@@ -31,6 +33,9 @@ import org.apache.streams.util.GuidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Properties;
import java.util.Queue;
@@ -42,11 +47,11 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper;
private KafkaConfiguration config;
- private Producer<String, Object> producer;
+ private Producer<String, String> producer;
public KafkaPersistWriter() {
this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
@@ -56,37 +61,38 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
this.config = config;
}
- public void start() {
- Properties props = new Properties();
-
- props.put("metadata.broker.list", config.getBrokerlist());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
- props.put("request.required.acks", "1");
- props.put("auto.create.topics.enable", "true");
-
- ProducerConfig config = new ProducerConfig(props);
-
- producer = new Producer<String, Object>(config);
-
- new Thread(new KafkaPersistWriterTask(this)).start();
- }
-
@Override
public void write(StreamsDatum entry) {
String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("kafkawriter");
- KeyedMessage<String, Object> data = new KeyedMessage<>(config.getTopic(), key, entry.getDocument());
+ Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
+ Preconditions.checkArgument(entry.getDocument() instanceof String);
+ Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
+
+ KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), key, (String)entry.getDocument());
producer.send(data);
}
@Override
public void prepare(Object configurationObject) {
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- start();
+ mapper = new ObjectMapper();
+
+ Properties props = new Properties();
+
+ props.put("metadata.broker.list", config.getBrokerlist());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
+ props.put("request.required.acks", "1");
+ props.put("auto.create.topics.enable", "true");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ producer = new Producer(config);
+
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
deleted file mode 100644
index 4aa9707..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class KafkaPersistWriterTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
-
- private KafkaPersistWriter writer;
-
- public KafkaPersistWriterTask(KafkaPersistWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public void run() {
-
- while(true) {
- if( writer.persistQueue.peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
index 5e2cab9..4dc339c 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
@@ -5,6 +5,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import javassist.bytecode.stackmap.TypeData;
import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.streams.console.ConsolePersistReader;
@@ -16,6 +21,7 @@ import org.apache.streams.kafka.KafkaConfiguration;
import org.apache.streams.kafka.KafkaPersistReader;
import org.apache.streams.kafka.KafkaPersistWriter;
import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -24,6 +30,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +45,7 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
@@ -75,6 +84,12 @@ public class KafkaPersistIT {
AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
PowerMockito.when(reader.readCurrent())
.thenReturn(
new StreamsResultSet(Queues.newConcurrentLinkedQueue(
@@ -103,9 +118,27 @@ public class KafkaPersistIT {
builder.start();
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
+
builder.stop();
Mockito.verify(writer).write(testDatum);
}
+
+ @After
+ public void shutdownTest() {
+ try {
+ testKafkaCluster.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ testKafkaCluster = null;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/17374fb9/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
index 1d04f74..572b87e 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -10,12 +10,14 @@ import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.kafka.KafkaConfiguration;
import org.apache.streams.kafka.KafkaPersistReader;
import org.apache.streams.kafka.KafkaPersistWriter;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Seq;
+import java.io.IOException;
import java.util.Properties;
/**
@@ -31,21 +33,29 @@ public class TestKafkaPersist {
@Before
public void prepareTest() {
-
try {
testKafkaCluster = new TestKafkaCluster();
} catch (Throwable e ) {
e.printStackTrace();
}
+ String zkConnect = testKafkaCluster.getZkConnectString().replace("127.0.0.1", "localhost");
+ String kafkaBroker = testKafkaCluster.getKafkaBrokerString().replace("127.0.0.1", "localhost");
+
testConfiguration = new KafkaConfiguration();
- testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
- testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
+ testConfiguration.setBrokerlist(kafkaBroker);
+ testConfiguration.setZkconnect(zkConnect);
testConfiguration.setTopic(testTopic);
- ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$);
+ ZkClient zkClient = new ZkClient(testConfiguration.getZkconnect(), 1000, 1000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ //Handle exception
+ }
}
@Test
@@ -55,7 +65,7 @@ public class TestKafkaPersist {
assert(testKafkaCluster != null);
KafkaPersistWriter testPersistWriter = new KafkaPersistWriter(testConfiguration);
- testPersistWriter.prepare(null);
+ testPersistWriter.prepare(testConfiguration);
try {
Thread.sleep(1000);
@@ -77,14 +87,12 @@ public class TestKafkaPersist {
KafkaPersistReader testPersistReader = new KafkaPersistReader(testConfiguration);
try {
- testPersistReader.prepare(null);
+ testPersistReader.prepare(testConfiguration);
} catch( Throwable e ) {
e.printStackTrace();
Assert.fail();
}
- testPersistReader.startStream();
-
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
@@ -98,4 +106,15 @@ public class TestKafkaPersist {
assert(testResult.size() == 1);
}
+
+ @After
+ public void shutdownTest() {
+ try {
+ testKafkaCluster.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ testKafkaCluster = null;
+ }
+ }
}