You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/05 06:37:04 UTC
apex-malhar git commit: APEXMALHAR-2298 Making the
KafkaExactlyOnceOutputOperator take generic object as input.
Repository: apex-malhar
Updated Branches:
refs/heads/master 70154f641 -> 70caa8909
APEXMALHAR-2298 Making the KafkaExactlyOnceOutputOperator take generic object as input.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70caa890
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70caa890
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70caa890
Branch: refs/heads/master
Commit: 70caa8909284b6cebd62b3b96ad817432b5e9df7
Parents: 70154f6
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Tue Nov 8 11:25:16 2016 -0800
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Wed Jan 4 09:01:57 2017 -0800
----------------------------------------------------------------------
...afkaSinglePortExactlyOnceOutputOperator.java | 54 ++++---
.../apache/apex/malhar/kafka/KafkaHelper.java | 65 ++++++++
.../malhar/kafka/KafkaOutputOperatorTest.java | 159 ++++++++++++-------
3 files changed, 197 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 6511cd4..ff16610 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -32,14 +32,13 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -55,6 +54,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
* Kafka output operator with exactly once processing semantics.
*<br>
*
+ * <p>
+ * <b>Requirements</b>
+ * <li>In the Kafka message, only Value will be available for users</li>
+ * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li>
+ * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li>
* <p>
* <b>Recovery handling</b>
* <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li>
@@ -106,8 +110,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
private final int KAFKA_CONNECT_ATTEMPT = 10;
private final String KEY_SEPARATOR = "#";
- private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
{
@@ -121,12 +125,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
@Override
public void setup(Context.OperatorContext context)
{
+ setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+
+ if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery.");
+ }
+
super.setup(context);
this.operatorId = context.getId();
this.windowDataManager.setup(context);
this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+
this.consumer = KafkaConsumerInit();
}
@@ -211,7 +222,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
private boolean alreadyInKafka(T message)
{
- if ( windowId <= windowDataManager.getLargestCompletedWindow() ) {
+ if (windowId <= windowDataManager.getLargestCompletedWindow()) {
return true;
}
@@ -219,17 +230,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
Integer val = partialWindowTuples.get(message);
- if ( val == 0 ) {
+ if (val == 0) {
return false;
- } else if ( val == 1 ) {
+ } else if (val == 1) {
partialWindowTuples.remove(message);
} else {
partialWindowTuples.put(message, val - 1);
}
-
return true;
}
-
return false;
}
@@ -245,10 +254,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
Map<Integer,Long> parttionsAndOffset = new HashMap<>();
consumer.assign(topicPartitionList);
- for ( PartitionInfo partitionInfo: partitionInfoList) {
-
+ for (PartitionInfo partitionInfo: partitionInfoList) {
try {
-
TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
if (latest) {
consumer.seekToEnd(topicPartition);
@@ -256,7 +263,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
consumer.seekToBeginning(topicPartition);
}
parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition));
-
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -280,13 +286,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
}
if (currentOffsets == null) {
- logger.debug("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
+ logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
return;
}
if (storedOffsets == null) {
- logger.debug("Stored offset not available, seeking to the beginning of the Kafka Partition.");
+ logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
try {
storedOffsets = getPartitionsAndOffsets(false);
@@ -298,14 +304,12 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
List<TopicPartition> topicPartitions = new ArrayList<>();
for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
-
topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
}
consumer.assign(topicPartitions);
for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
-
Long storedOffset = 0L;
Integer currentPartition = entry.getKey();
Long currentOffset = entry.getValue();
@@ -327,9 +331,9 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
int kafkaAttempt = 0;
- while ( true ) {
+ while (true) {
- ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
+ ConsumerRecords<String, T> consumerRecords = consumer.poll(100);
if (consumerRecords.count() == 0) {
if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
@@ -341,15 +345,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
boolean crossedBoundary = false;
- for (ConsumerRecord consumerRecord : consumerRecords) {
+ for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
- if (!doesKeyBelongsToThisInstance(operatorId, (String)consumerRecord.key())) {
+ if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
continue;
}
- T value = (T)consumerRecord.value();
+ T value = consumerRecord.value();
- if ( partialWindowTuples.containsKey(value)) {
+ if (partialWindowTuples.containsKey(value)) {
Integer count = partialWindowTuples.get(value);
partialWindowTuples.put(value, count + 1);
} else {
@@ -375,14 +379,14 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
- props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG));
return new KafkaConsumer<>(props);
}
protected void sendTuple(T tuple)
{
- if ( alreadyInKafka(tuple) ) {
+ if (alreadyInKafka(tuple)) {
return;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
new file mode 100644
index 0000000..c550032
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person>
+{
+ @Override
+ public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int nameLength = byteBuffer.getInt();
+ byte[] name = new byte[nameLength];
+
+ byteBuffer.get(name, 0, nameLength);
+
+ return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
+ }
+
+ @Override
+ public byte[] serialize(String s, KafkaOutputOperatorTest.Person person)
+ {
+ byte[] name = person.name.getBytes();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4);
+
+ byteBuffer.putInt(name.length);
+ byteBuffer.put(name);
+ byteBuffer.putInt(person.age);
+
+ return byteBuffer.array();
+ }
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b)
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index 5d0e59a..58d69f6 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -25,6 +25,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
@@ -33,32 +44,20 @@ import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.stram.StramLocalCluster;
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-@Ignore
public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
{
String testName;
- private static List<String> tupleCollection = new LinkedList<>();
- private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- private final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static List<Person> tupleCollection = new LinkedList<>();
+ private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+ private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
- public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
+ public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
@Before
public void before()
@@ -71,14 +70,20 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
}
}
+ @After
+ public void after()
+ {
+ FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+ }
+
@Test
public void testExactlyOnceWithFailure() throws Exception
{
- List<String> toKafka = GenerateList();
+ List<Person> toKafka = GenerateList();
sendDataToKafka(true, toKafka, true, false);
- List<String> fromKafka = ReadFromKafka();
+ List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
}
@@ -86,11 +91,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
@Test
public void testExactlyOnceWithNoFailure() throws Exception
{
- List<String> toKafka = GenerateList();
+ List<Person> toKafka = GenerateList();
sendDataToKafka(true, toKafka, false, false);
- List<String> fromKafka = ReadFromKafka();
+ List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
}
@@ -98,14 +103,14 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
@Test
public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
{
- List<String> toKafka = GenerateList();
+ List<Person> toKafka = GenerateList();
try {
sendDataToKafka(true, toKafka, true, true);
} catch (RuntimeException ex) {
boolean expectedException = false;
- if ( ex.getMessage().contains("Violates")) {
+ if (ex.getMessage().contains("Violates")) {
expectedException = true;
}
@@ -119,11 +124,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
@Test
public void testKafkaOutput() throws Exception
{
- List<String> toKafka = GenerateList();
+ List<Person> toKafka = GenerateList();
sendDataToKafka(false, toKafka, false, false);
- List<String> fromKafka = ReadFromKafka();
+ List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("No failure", compare(fromKafka, toKafka));
}
@@ -131,38 +136,42 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
@Test
public void testKafkaOutputWithFailure() throws Exception
{
- List<String> toKafka = GenerateList();
+ List<Person> toKafka = GenerateList();
sendDataToKafka(false, toKafka, true, true);
- List<String> fromKafka = ReadFromKafka();
+ List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
}
- private void sendDataToKafka(boolean exactlyOnce, List<String> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
+ private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
{
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ if (!exactlyOnce) {
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
+ }
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
+ attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
cleanUp(operatorContext);
Operator kafkaOutput;
- DefaultInputPort<String> inputPort;
+ DefaultInputPort<Person> inputPort;
- if ( exactlyOnce ) {
- KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+ if (exactlyOnce) {
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
- KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
@@ -181,14 +190,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
inputPort.getSink().put(toKafka.get(6));
inputPort.getSink().put(toKafka.get(7));
- if ( hasFailure ) {
-
- if ( exactlyOnce ) {
- KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+ if (hasFailure) {
+ if (exactlyOnce) {
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
- KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+ KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
@@ -217,9 +225,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
cleanUp(operatorContext);
}
- private KafkaSinglePortExactlyOnceOutputOperator<String> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+ private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
{
- KafkaSinglePortExactlyOnceOutputOperator<String> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
kafkaOutput.setTopic(testName);
kafkaOutput.setProperties(props);
kafkaOutput.setup(operatorContext);
@@ -227,9 +235,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
return kafkaOutput;
}
- private KafkaSinglePortOutputOperator<String,String> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+ private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
{
- KafkaSinglePortOutputOperator<String,String> kafkaOutput = new KafkaSinglePortOutputOperator<>();
+ KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
kafkaOutput.setTopic(testName);
kafkaOutput.setProperties(props);
kafkaOutput.setup(operatorContext);
@@ -239,7 +247,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
private void cleanUp(Context.OperatorContext operatorContext)
{
- WindowDataManager windowDataManager = new FSWindowDataManager();
+ FSWindowDataManager windowDataManager = new FSWindowDataManager();
windowDataManager.setup(operatorContext);
try {
windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
@@ -248,7 +256,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
}
}
- private boolean compare(List<String> fromKafka, List<String> toKafka)
+ private boolean compare(List<Person> fromKafka, List<Person> toKafka)
{
if (fromKafka.size() != toKafka.size()) {
return false;
@@ -272,19 +280,18 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
(hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
}
- private List<String> GenerateList()
+ private List<Person> GenerateList()
{
- List<String> strings = new ArrayList<>();
+ List<Person> people = new ArrayList<>();
for (Integer i = 0; i < 12; ++i) {
-
- strings.add(i.toString());
+ people.add(new Person(i.toString(), i));
}
- return strings;
+ return people;
}
- public List<String> ReadFromKafka()
+ private List<Person> ReadFromKafka()
{
tupleCollection.clear();
@@ -292,11 +299,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
- props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
props.put(GROUP_ID_CONFIG, "KafkaTest");
-
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
@@ -316,7 +322,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
// Connect ports
dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
-
// Create local cluster
final LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
@@ -328,7 +333,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
public static class CollectorModule extends BaseOperator
{
-
public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
long currentWindowId;
@@ -353,7 +357,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
{
super.endWindow();
}
-
}
public static class CollectorInputPort extends DefaultInputPort<byte[]>
@@ -368,8 +371,52 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
@Override
public void process(byte[] bt)
{
- String tuple = new String(bt);
- tupleCollection.add(tuple);
+ tupleCollection.add(new KafkaHelper().deserialize("r", bt));
+ }
+ }
+
+ public static class Person
+ {
+ public String name;
+ public Integer age;
+
+ public Person(String name, Integer age)
+ {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Person person = (Person)o;
+
+ if (name != null ? !name.equals(person.name) : person.name != null) {
+ return false;
+ }
+
+ return age != null ? age.equals(person.age) : person.age == null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (age != null ? age.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name + age.toString();
}
}
}