You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/05 06:37:04 UTC

apex-malhar git commit: APEXMALHAR-2298 Making the KafkaExactlyOnceOutputOperator take generic object as input.

Repository: apex-malhar
Updated Branches:
  refs/heads/master 70154f641 -> 70caa8909


APEXMALHAR-2298 Making the KafkaExactlyOnceOutputOperator take generic object as input.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70caa890
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70caa890
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70caa890

Branch: refs/heads/master
Commit: 70caa8909284b6cebd62b3b96ad817432b5e9df7
Parents: 70154f6
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Tue Nov 8 11:25:16 2016 -0800
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Wed Jan 4 09:01:57 2017 -0800

----------------------------------------------------------------------
 ...afkaSinglePortExactlyOnceOutputOperator.java |  54 ++++---
 .../apache/apex/malhar/kafka/KafkaHelper.java   |  65 ++++++++
 .../malhar/kafka/KafkaOutputOperatorTest.java   | 159 ++++++++++++-------
 3 files changed, 197 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 6511cd4..ff16610 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -32,14 +32,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 
@@ -55,6 +54,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
  * Kafka output operator with exactly once processing semantics.
  *<br>
  *
+ *  <p>
+ * <b>Requirements</b>
+ * <li>In the Kafka message, only Value will be available for users</li>
+ * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li>
+ * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li>
  * <p>
  * <b>Recovery handling</b>
  * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li>
@@ -106,8 +110,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
   private final int KAFKA_CONNECT_ATTEMPT = 10;
   private final String KEY_SEPARATOR = "#";
 
-  private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-  private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
 
   public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
   {
@@ -121,12 +125,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
   @Override
   public void setup(Context.OperatorContext context)
   {
+    setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+
+    if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+      throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery.");
+    }
+
     super.setup(context);
 
     this.operatorId = context.getId();
     this.windowDataManager.setup(context);
     this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
     this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+
     this.consumer = KafkaConsumerInit();
   }
 
@@ -211,7 +222,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
   private boolean alreadyInKafka(T message)
   {
-    if ( windowId <= windowDataManager.getLargestCompletedWindow() ) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       return true;
     }
 
@@ -219,17 +230,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
       Integer val = partialWindowTuples.get(message);
 
-      if ( val == 0 ) {
+      if (val == 0) {
         return false;
-      } else if ( val == 1 ) {
+      } else if (val == 1) {
         partialWindowTuples.remove(message);
       } else {
         partialWindowTuples.put(message, val - 1);
       }
-
       return true;
     }
-
     return false;
   }
 
@@ -245,10 +254,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
     Map<Integer,Long> parttionsAndOffset = new HashMap<>();
     consumer.assign(topicPartitionList);
 
-    for ( PartitionInfo partitionInfo: partitionInfoList) {
-
+    for (PartitionInfo partitionInfo: partitionInfoList) {
       try {
-
         TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
         if (latest) {
           consumer.seekToEnd(topicPartition);
@@ -256,7 +263,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
           consumer.seekToBeginning(topicPartition);
         }
         parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition));
-
       } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
@@ -280,13 +286,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
     }
 
     if (currentOffsets == null) {
-      logger.debug("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
+      logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
       return;
     }
 
     if (storedOffsets == null) {
 
-      logger.debug("Stored offset not available, seeking to the beginning of the Kafka Partition.");
+      logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
 
       try {
         storedOffsets = getPartitionsAndOffsets(false);
@@ -298,14 +304,12 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
     List<TopicPartition> topicPartitions = new ArrayList<>();
 
     for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
-
       topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
     }
 
     consumer.assign(topicPartitions);
 
     for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
-
       Long storedOffset = 0L;
       Integer currentPartition = entry.getKey();
       Long currentOffset = entry.getValue();
@@ -327,9 +331,9 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
       int kafkaAttempt = 0;
 
-      while ( true ) {
+      while (true) {
 
-        ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
+        ConsumerRecords<String, T> consumerRecords = consumer.poll(100);
 
         if (consumerRecords.count() == 0) {
           if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
@@ -341,15 +345,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
         boolean crossedBoundary = false;
 
-        for (ConsumerRecord consumerRecord : consumerRecords) {
+        for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
 
-          if (!doesKeyBelongsToThisInstance(operatorId, (String)consumerRecord.key())) {
+          if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
             continue;
           }
 
-          T value = (T)consumerRecord.value();
+          T value = consumerRecord.value();
 
-          if ( partialWindowTuples.containsKey(value)) {
+          if (partialWindowTuples.containsKey(value)) {
             Integer count = partialWindowTuples.get(value);
             partialWindowTuples.put(value, count + 1);
           } else {
@@ -375,14 +379,14 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
     props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
-    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG));
 
     return new KafkaConsumer<>(props);
   }
 
   protected void sendTuple(T tuple)
   {
-    if ( alreadyInKafka(tuple) ) {
+    if (alreadyInKafka(tuple)) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
new file mode 100644
index 0000000..c550032
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person>
+{
+  @Override
+  public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
+  {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int nameLength = byteBuffer.getInt();
+    byte[] name = new byte[nameLength];
+
+    byteBuffer.get(name, 0, nameLength);
+
+    return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
+  }
+
+  @Override
+  public byte[] serialize(String s, KafkaOutputOperatorTest.Person person)
+  {
+    byte[] name = person.name.getBytes();
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4);
+
+    byteBuffer.putInt(name.length);
+    byteBuffer.put(name);
+    byteBuffer.putInt(person.age);
+
+    return byteBuffer.array();
+  }
+
+  @Override
+  public void configure(Map<String, ?> map, boolean b)
+  {
+  }
+
+  @Override
+  public void close()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index 5d0e59a..58d69f6 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -25,6 +25,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -33,32 +44,20 @@ import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.stram.StramLocalCluster;
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 
-@Ignore
 public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
 {
   String testName;
-  private static List<String> tupleCollection = new LinkedList<>();
-  private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-  private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-  private final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-  private final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  private static List<Person> tupleCollection = new LinkedList<>();
+  private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+  private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
 
-  public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
+  public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
 
   @Before
   public void before()
@@ -71,14 +70,20 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     }
   }
 
+  @After
+  public void after()
+  {
+    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+  }
+
   @Test
   public void testExactlyOnceWithFailure() throws Exception
   {
-    List<String> toKafka = GenerateList();
+    List<Person> toKafka = GenerateList();
 
     sendDataToKafka(true, toKafka, true, false);
 
-    List<String> fromKafka = ReadFromKafka();
+    List<Person> fromKafka = ReadFromKafka();
 
     Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
   }
@@ -86,11 +91,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
   @Test
   public void testExactlyOnceWithNoFailure() throws Exception
   {
-    List<String> toKafka = GenerateList();
+    List<Person> toKafka = GenerateList();
 
     sendDataToKafka(true, toKafka, false, false);
 
-    List<String> fromKafka = ReadFromKafka();
+    List<Person> fromKafka = ReadFromKafka();
 
     Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
   }
@@ -98,14 +103,14 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
   @Test
   public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
   {
-    List<String> toKafka = GenerateList();
+    List<Person> toKafka = GenerateList();
 
     try {
       sendDataToKafka(true, toKafka, true, true);
     } catch (RuntimeException ex) {
 
       boolean expectedException = false;
-      if ( ex.getMessage().contains("Violates")) {
+      if (ex.getMessage().contains("Violates")) {
         expectedException = true;
       }
 
@@ -119,11 +124,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
   @Test
   public void testKafkaOutput() throws Exception
   {
-    List<String> toKafka = GenerateList();
+    List<Person> toKafka = GenerateList();
 
     sendDataToKafka(false, toKafka, false, false);
 
-    List<String> fromKafka = ReadFromKafka();
+    List<Person> fromKafka = ReadFromKafka();
 
     Assert.assertTrue("No failure", compare(fromKafka, toKafka));
   }
@@ -131,38 +136,42 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
   @Test
   public void testKafkaOutputWithFailure() throws Exception
   {
-    List<String> toKafka = GenerateList();
+    List<Person> toKafka = GenerateList();
 
     sendDataToKafka(false, toKafka, true, true);
 
-    List<String> fromKafka = ReadFromKafka();
+    List<Person> fromKafka = ReadFromKafka();
 
     Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
   }
 
-  private void sendDataToKafka(boolean exactlyOnce, List<String> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
+  private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
   {
     Properties props = new Properties();
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+    if (!exactlyOnce) {
+      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
+    }
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
 
     Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
+    attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
 
     OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
 
     cleanUp(operatorContext);
 
     Operator kafkaOutput;
-    DefaultInputPort<String> inputPort;
+    DefaultInputPort<Person> inputPort;
 
-    if ( exactlyOnce ) {
-      KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+    if (exactlyOnce) {
+      KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
       inputPort = kafkaOutputTemp.inputPort;
       kafkaOutput = kafkaOutputTemp;
     } else {
-      KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+      KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
       inputPort = kafkaOutputTemp.inputPort;
       kafkaOutput = kafkaOutputTemp;
     }
@@ -181,14 +190,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     inputPort.getSink().put(toKafka.get(6));
     inputPort.getSink().put(toKafka.get(7));
 
-    if ( hasFailure ) {
-
-      if ( exactlyOnce ) {
-        KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+    if (hasFailure) {
+      if (exactlyOnce) {
+        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
         inputPort = kafkaOutputTemp.inputPort;
         kafkaOutput = kafkaOutputTemp;
       } else {
-        KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+        KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
         inputPort = kafkaOutputTemp.inputPort;
         kafkaOutput = kafkaOutputTemp;
       }
@@ -217,9 +225,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     cleanUp(operatorContext);
   }
 
-  private KafkaSinglePortExactlyOnceOutputOperator<String> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+  private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
   {
-    KafkaSinglePortExactlyOnceOutputOperator<String> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
+    KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
     kafkaOutput.setTopic(testName);
     kafkaOutput.setProperties(props);
     kafkaOutput.setup(operatorContext);
@@ -227,9 +235,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     return kafkaOutput;
   }
 
-  private KafkaSinglePortOutputOperator<String,String> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+  private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
   {
-    KafkaSinglePortOutputOperator<String,String> kafkaOutput = new KafkaSinglePortOutputOperator<>();
+    KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
     kafkaOutput.setTopic(testName);
     kafkaOutput.setProperties(props);
     kafkaOutput.setup(operatorContext);
@@ -239,7 +247,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
 
   private void cleanUp(Context.OperatorContext operatorContext)
   {
-    WindowDataManager windowDataManager = new FSWindowDataManager();
+    FSWindowDataManager windowDataManager = new FSWindowDataManager();
     windowDataManager.setup(operatorContext);
     try {
       windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
@@ -248,7 +256,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     }
   }
 
-  private boolean compare(List<String> fromKafka, List<String> toKafka)
+  private boolean compare(List<Person> fromKafka, List<Person> toKafka)
   {
     if (fromKafka.size() != toKafka.size()) {
       return false;
@@ -272,19 +280,18 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
         (hasMultiCluster && hasMultiPartition ? "," + l  + TEST_KAFKA_BROKER_PORT[1][1] : "");
   }
 
-  private List<String> GenerateList()
+  private List<Person> GenerateList()
   {
-    List<String> strings = new ArrayList<>();
+    List<Person> people = new ArrayList<>();
 
     for (Integer i = 0; i < 12; ++i) {
-
-      strings.add(i.toString());
+      people.add(new Person(i.toString(), i));
     }
 
-    return strings;
+    return people;
   }
 
-  public List<String> ReadFromKafka()
+  private List<Person> ReadFromKafka()
   {
     tupleCollection.clear();
 
@@ -292,11 +299,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     Properties props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
     props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
-    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
     props.put(GROUP_ID_CONFIG, "KafkaTest");
 
-
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
 
@@ -316,7 +322,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     // Connect ports
     dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
 
-
     // Create local cluster
     final LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
@@ -328,7 +333,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
 
   public static class CollectorModule extends BaseOperator
   {
-
     public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
 
     long currentWindowId;
@@ -353,7 +357,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     {
       super.endWindow();
     }
-
   }
 
   public static class CollectorInputPort extends DefaultInputPort<byte[]>
@@ -368,8 +371,52 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     @Override
     public void process(byte[] bt)
     {
-      String tuple = new String(bt);
-      tupleCollection.add(tuple);
+      tupleCollection.add(new KafkaHelper().deserialize("r", bt));
+    }
+  }
+
+  public static class Person
+  {
+    public String name;
+    public Integer age;
+
+    public Person(String name, Integer age)
+    {
+      this.name = name;
+      this.age = age;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      Person person = (Person)o;
+
+      if (name != null ? !name.equals(person.name) : person.name != null) {
+        return false;
+      }
+
+      return age != null ? age.equals(person.age) : person.age == null;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int result = name != null ? name.hashCode() : 0;
+      result = 31 * result + (age != null ? age.hashCode() : 0);
+      return result;
+    }
+
+    @Override
+    public String toString()
+    {
+      return name + age.toString();
     }
   }
 }