You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/05 18:26:26 UTC

[storm] branch master updated: [STORM-3620] fix data corruption in the case of multi-threaded bolts caused by non thread-safe serializers (#3280)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new fcfc32d  [STORM-3620] fix data corruption in the case of multi-threaded bolts caused by non thread-safe serializers (#3280)
fcfc32d is described below

commit fcfc32dfd537d9d99586c97459b4d465a0355b46
Author: Meng Li (Ethan) <et...@gmail.com>
AuthorDate: Fri Jun 5 13:26:17 2020 -0500

    [STORM-3620] fix data corruption in the case of multi-threaded bolts caused by non thread-safe serializers (#3280)
---
 .../starter/MultiThreadWordCountTopology.java      | 107 +++++++++++
 .../apache/storm/starter/WordCountTopology.java    |  24 +--
 .../apache/storm/starter/bolt/WordCountBolt.java   |  43 +++++
 .../apache/storm/daemon/worker/WorkerTransfer.java |   4 +-
 .../apache/storm/executor/ExecutorTransfer.java    |  12 +-
 .../org.mockito.plugins.MockMaker                  |  16 ++
 .../ExecutorTransferMultiThreadingTest.java        | 195 +++++++++++++++++++++
 7 files changed, 372 insertions(+), 29 deletions(-)

diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultiThreadWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultiThreadWordCountTopology.java
new file mode 100644
index 0000000..2d39855
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultiThreadWordCountTopology.java
@@ -0,0 +1,107 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+  2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+  and limitations under the License.
+ */
+
+package org.apache.storm.starter;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.storm.Config;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.starter.bolt.WordCountBolt;
+import org.apache.storm.starter.spout.RandomSentenceSpout;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.ConfigurableTopology;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Some topologies might spawn some threads within bolts to do some work and emit tuples from those threads.
+ * This is a simple wordcount topology example that mimics those use cases and might help us catch possible race conditions.
+ */
+public class MultiThreadWordCountTopology extends ConfigurableTopology {
+    public static void main(String[] args) {
+        ConfigurableTopology.start(new MultiThreadWordCountTopology(), args);
+    }
+
+    @Override
+    protected int run(String[] args) {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 1);
+        builder.setBolt("split", new MultiThreadedSplitSentence(), 1).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCountBolt(), 1).fieldsGrouping("split", new Fields("word"));
+
+        //this makes sure there is only one executor per worker, easier to debug
+        //problems involving serialization/deserialization will only happen in inter-worker data transfer
+        conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+        //this involves metricsTick
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+        conf.setTopologyWorkerMaxHeapSize(128);
+
+        String topologyName = "multithreaded-word-count";
+
+        if (args != null && args.length > 0) {
+            topologyName = args[0];
+        }
+        return submit(topologyName, conf, builder);
+    }
+
+    public static class MultiThreadedSplitSentence implements IRichBolt {
+
+        private OutputCollector collector;
+        private ExecutorService executor;
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+            executor = Executors.newFixedThreadPool(6);
+            //This makes sure metricsTick to be called every 1 second
+            //it makes the race condition between metricsTick and outputCollector easier to happen if any
+            context.registerMetric("dummy-counter", () -> 0, 1);
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            String str = input.getString(0);
+            String[] splits = str.split("\\s+");
+            for (String s : splits) {
+                //spawn other threads to do the work and emit
+                Runnable runnableTask = () -> {
+                    collector.emit(new Values(s));
+                };
+                executor.submit(runnableTask);
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+
+        @Override
+        public void cleanup() {
+        }
+    }
+}
\ No newline at end of file
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index 628aad7..ff3e4c3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -14,6 +14,7 @@ package org.apache.storm.starter;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.starter.bolt.WordCountBolt;
 import org.apache.storm.starter.spout.RandomSentenceSpout;
 import org.apache.storm.task.ShellBolt;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -43,7 +44,7 @@ public class WordCountTopology extends ConfigurableTopology {
         builder.setSpout("spout", new RandomSentenceSpout(), 5);
 
         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
 
         conf.setDebug(true);
 
@@ -73,25 +74,4 @@ public class WordCountTopology extends ConfigurableTopology {
             return null;
         }
     }
-
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null) {
-                count = 0;
-            }
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
 }
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/WordCountBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/WordCountBolt.java
new file mode 100644
index 0000000..870d5bd
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/WordCountBolt.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.starter.bolt;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+public class WordCountBolt extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        String word = tuple.getString(0);
+        Integer count = counts.get(word);
+        if (count == null) {
+            count = 0;
+        }
+        count++;
+        counts.put(word, count);
+        collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 916850e..68f4d82 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // Transfers messages destined to other workers
-class WorkerTransfer implements JCQueue.Consumer {
+public class WorkerTransfer implements JCQueue.Consumer {
     static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
 
     private final TransferDrainer drainer;
@@ -50,7 +50,7 @@ class WorkerTransfer implements JCQueue.Consumer {
 
     private final AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP.
 
-    WorkerTransfer(WorkerState workerState, Map<String, Object> topologyConf, int maxTaskIdInTopo) {
+    public WorkerTransfer(WorkerState workerState, Map<String, Object> topologyConf, int maxTaskIdInTopo) {
         this.workerState = workerState;
         this.backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
         this.drainer = new TransferDrainer();
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
index 4121a85..ffd25c4 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
@@ -31,17 +32,18 @@ public class ExecutorTransfer {
     private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
 
     private final WorkerState workerData;
-    private final KryoTupleSerializer serializer;
+    // one serializer per thread to avoid data corruption
+    private final ThreadLocal<KryoTupleSerializer> threadLocalSerializer;
     private final boolean isDebug;
     private int indexingBase = 0;
     private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
     private AtomicReferenceArray<JCQueue> queuesToFlush;
     // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
 
-
     public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
         this.workerData = workerData;
-        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
+        WorkerTopologyContext workerTopologyContext = workerData.getWorkerTopologyContext();
+        this.threadLocalSerializer = ThreadLocal.withInitial(() -> new KryoTupleSerializer(topoConf, workerTopologyContext));
         this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
     }
 
@@ -63,7 +65,7 @@ public class ExecutorTransfer {
         if (localQueue != null) {
             return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
         }
-        return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
+        return workerData.tryTransferRemote(addressedTuple, pendingEmits, threadLocalSerializer.get());
     }
 
 
@@ -96,7 +98,7 @@ public class ExecutorTransfer {
      * Returns false if unable to add to localQueue.
      */
     public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) {
-        workerData.checkSerialize(serializer, tuple);
+        workerData.checkSerialize(threadLocalSerializer.get(), tuple);
         if (pendingEmits != null) {
             if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
                 queuesToFlush.set(tuple.dest - indexingBase, localQueue);
diff --git a/storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker b/storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ea0ecfb
--- /dev/null
+++ b/storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+mock-maker-inline
\ No newline at end of file
diff --git a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
new file mode 100644
index 0000000..5dcb128
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.Constants;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.daemon.worker.WorkerTransfer;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.policy.WaitStrategyPark;
+import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.FieldSetter;
+
+/**
+ * Some topologies might spawn extra threads inside components to perform real processing work and emit processed results.
+ * This unit test is to mimic these scenarios in the {@link ExecutorTransfer} level
+ * and make sure the tuples sent out in a multi-threading fashion to the workerTransferQueue
+ * is properly handled and received by the remote Worker (consumer)
+ *
+ * The topology structure this test mimics is
+ * {worker1: taskId=1, component="1"} --> {worker2: taskId=2, component="2"}.
+ */
+public class ExecutorTransferMultiThreadingTest {
+
+    private WorkerState workerState;
+    private Map<String, Object> topoConf;
+    private JCQueue transferQueue;
+    private GeneralTopologyContext generalTopologyContext;
+    private int selfTaskId = 1;
+    private String sourceComp = "1";
+    private int remoteTaskId = 2;
+    private String destComp = "2";
+    private static String value1 = "string-value";
+    private static int value2 = 1234;
+
+    @Before
+    public void setup() throws NoSuchFieldException {
+        topoConf = Utils.readStormConfig();
+        String topologyId = "multi-threaded-topo-test";
+        StormTopology stormTopology = createStormTopology();
+
+        WorkerTopologyContext workerTopologyContext = mock(WorkerTopologyContext.class);
+        when(workerTopologyContext.getRawTopology()).thenReturn(stormTopology);
+        when(workerTopologyContext.getComponentId(selfTaskId)).thenReturn(sourceComp);
+        when(workerTopologyContext.getComponentId(remoteTaskId)).thenReturn(destComp);
+
+        workerState = mock(WorkerState.class);
+        when(workerState.getWorkerTopologyContext()).thenReturn(workerTopologyContext);
+        Map<Integer, JCQueue> receiveQMap = new HashMap<>();
+        //local recvQ is not important in this test; simple mock it
+        receiveQMap.put(selfTaskId, mock(JCQueue.class));
+        when(workerState.getLocalReceiveQueues()).thenReturn(receiveQMap);
+        when(workerState.getTopologyId()).thenReturn(topologyId);
+        when(workerState.getPort()).thenReturn(6701);
+        when(workerState.getMetricRegistry()).thenReturn(new StormMetricRegistry());
+        when(workerState.tryTransferRemote(any(), any(), any())).thenCallRealMethod();
+
+        //the actual worker transfer queue to be used in this test
+        //taskId for worker transfer queue should be -1.
+        //But there is already one worker transfer queue initialized by WorkerTransfer class (taskId=-1).
+        //However the taskId is only used for metrics and it is not important here. Making it -100 to avoid collision.
+        transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
+            workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-100), workerState.getPort(),
+            workerState.getMetricRegistry());
+
+        //Replace the transferQueue inside WorkerTransfer (inside WorkerState) with the customized transferQueue to be used in this test
+        WorkerTransfer workerTransfer = new WorkerTransfer(workerState, topoConf, 2);
+        FieldSetter.setField(workerTransfer, workerTransfer.getClass().getDeclaredField("transferQueue"), transferQueue);
+        FieldSetter.setField(workerState, workerState.getClass().getDeclaredField("workerTransfer"), workerTransfer);
+
+        generalTopologyContext = mock(GeneralTopologyContext.class);
+    }
+
+    @Test
+    public void testExecutorTransfer() throws InterruptedException {
+        //There is one ExecutorTransfer per executor
+        ExecutorTransfer executorTransfer = new ExecutorTransfer(workerState, topoConf);
+        executorTransfer.initLocalRecvQueues();
+        ExecutorService executorService = Executors.newFixedThreadPool(5);
+
+        //There can be multiple producer threads sending out tuples inside each executor
+        //This mimics the case of multi-threading components where a component spawns extra threads to emit tuples.
+        int producerTaskNum = 10;
+        Runnable[] producerTasks = new Runnable[producerTaskNum];
+        for (int i = 0; i < producerTaskNum; i++) {
+            producerTasks[i] = createProducerTask(executorTransfer);
+        }
+        for (Runnable task : producerTasks) {
+            executorService.submit(task);
+        }
+
+        //give producers enough time to insert messages into the queue
+        executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+
+        //consume all the tuples in the queue and deserialize them one by one
+        //this mimics a remote worker.
+        KryoTupleDeserializer deserializer = new KryoTupleDeserializer(topoConf, workerState.getWorkerTopologyContext());
+        SingleThreadedConsumer consumer = new SingleThreadedConsumer(deserializer, producerTaskNum);
+        transferQueue.consume(consumer);
+        consumer.finalCheck();
+    }
+
+    private Runnable createProducerTask(ExecutorTransfer executorTransfer) {
+        return new Runnable() {
+            Tuple tuple = new TupleImpl(generalTopologyContext, new Values(value1, value2), sourceComp, selfTaskId, "default");
+            AddressedTuple addressedTuple = new AddressedTuple(remoteTaskId, tuple);
+
+            @Override
+            public void run() {
+                executorTransfer.tryTransfer(addressedTuple, null);
+            }
+        };
+    }
+
+    private StormTopology createStormTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(sourceComp, new TestWordSpout(true), 1);
+        builder.setBolt(destComp, new TestWordCounter(), 1).fieldsGrouping(sourceComp, new Fields("word"));
+        return builder.createTopology();
+    }
+
+    private static class SingleThreadedConsumer implements JCQueue.Consumer {
+        KryoTupleDeserializer deserializer;
+        int numMessages;
+        int msgCount = 0;
+
+        public SingleThreadedConsumer(KryoTupleDeserializer deserializer, int numMessages) {
+            this.deserializer = deserializer;
+            this.numMessages = numMessages;
+        }
+
+        /**
+         * There are multiple producers sending messages to the queue simultaneously.
+         * The consumer receives messages one by one and tries to deserialize them.
+         * If there is any issues/exceptions during the process, it basically means data corruption is happening.
+         * @param o the received object
+         */
+        @Override
+        public void accept(Object o) {
+            TaskMessage taskMessage = (TaskMessage) o;
+            TupleImpl receivedTuple = deserializer.deserialize(taskMessage.message());
+            Assert.assertEquals(receivedTuple.getValue(0), value1);
+            Assert.assertEquals(receivedTuple.getValue(1), value2);
+            msgCount++;
+        }
+
+        /**
+         * This makes sure every message sent by the producers are received by this consumer.
+         */
+        public void finalCheck() {
+            Assert.assertEquals(numMessages, msgCount);
+        }
+
+        @Override
+        public void flush() {
+            //no op
+        }
+    }
+}
\ No newline at end of file