You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/01 18:51:30 UTC

samza git commit: SAMZA-1305. SAMZA-1306. Unit test - zk unavailable.

Repository: samza
Updated Branches:
  refs/heads/master 19f4d1295 -> a90ca11ef


SAMZA-1305. SAMZA-1306. Unit test - zk unavailable.

Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #198 from sborya/unitTestZkUnavailable2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a90ca11e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a90ca11e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a90ca11e

Branch: refs/heads/master
Commit: a90ca11efc7c4ef31b70814caf74233dfc5cea3e
Parents: 19f4d12
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Jun 1 11:51:22 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Thu Jun 1 11:51:22 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java |  29 +-
 .../processor/TestZkStreamProcessorBase.java    | 336 +++++++++++++++++++
 .../test/processor/TestZkStreamProcessor.java   | 151 +++------
 .../processor/TestZkStreamProcessorBase.java    | 270 ---------------
 .../TestZkStreamProcessorFailures.java          | 147 ++++++++
 5 files changed, 545 insertions(+), 388 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5c6a474..c0a2163 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -20,6 +20,11 @@ package org.apache.samza.processor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
@@ -40,12 +45,6 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 /**
  * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
  * independent process.
@@ -93,7 +92,7 @@ public class StreamProcessor {
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener);
+    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener, null);
   }
 
   /**
@@ -106,7 +105,7 @@ public class StreamProcessor {
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener);
+    this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
   }
 
   /* package private */
@@ -118,7 +117,6 @@ public class StreamProcessor {
         .getJobCoordinator(config);
   }
 
-  @VisibleForTesting
   StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
                   StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
     this.taskFactory = taskFactory;
@@ -126,22 +124,11 @@ public class StreamProcessor {
     this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.customMetricsReporter = customMetricsReporters;
     this.processorListener = processorListener;
-    this.jobCoordinator = jobCoordinator;
+    this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
   }
 
-  private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
-                          Object taskFactory, StreamProcessorLifecycleListener processorListener) {
-    this.taskFactory = taskFactory;
-    this.config = config;
-    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
-    this.customMetricsReporter = customMetricsReporters;
-    this.processorListener = processorListener;
-    this.jobCoordinator = getJobCoordinator();
-    this.jobCoordinator.setListener(createJobCoordinatorListener());
-  }
-
   /**
    * Asynchronously starts this {@link StreamProcessor}.
    * <p>

http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
new file mode 100644
index 0000000..a315083
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.StandaloneIntegrationTestHarness;
+import org.apache.samza.test.StandaloneTestUtils;
+import org.apache.samza.util.Util;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
+  public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
+  public final static int BAD_MESSAGE_KEY = 1000;
+  // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
+  protected final static int ATTEMPTS_NUMBER = 5;
+
+  protected AtomicInteger counter = new AtomicInteger(1);
+  protected String testSystem;
+  protected String inputTopic;
+  protected String outputTopic;
+  protected int messageCount = 40;
+
+  protected Map<String, String> map;
+
+  protected String prefix() {
+    return "";
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    // for each tests - make the common parts unique
+    int seqNum = counter.getAndAdd(1);
+    testSystem = prefix() + "test-system" + seqNum;
+    inputTopic = prefix() + "numbers" + seqNum;
+    outputTopic = prefix() + "output" + seqNum;
+
+    map = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
+
+    // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
+    // TopicExistsException since StreamProcessor auto-creates them.
+    createTopics(inputTopic, outputTopic);
+  }
+
+  protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map, final Object mutexStart,
+      final Object mutexStop) {
+    map.put(ApplicationConfig.PROCESSOR_ID, pId);
+
+    Config config = new MapConfig(map);
+    JobCoordinator jobCoordinator =
+        Util.<JobCoordinatorFactory>getObj(new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName())
+            .getJobCoordinator(config);
+
+    StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() {
+      @Override
+      public void onStart() {
+        if (mutexStart != null) {
+          synchronized (mutexStart) {
+            mutexStart.notifyAll();
+          }
+        }
+        LOG.info("onStart is called for pid=" + pId);
+      }
+
+      @Override
+      public void onShutdown() {
+        if (mutexStop != null) {
+          synchronized (mutexStart) {
+            mutexStart.notify();
+          }
+        }
+        LOG.info("onShutdown is called for pid=" + pId);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.info("onFailure is called for pid=" + pId);
+      }
+    };
+
+    StreamProcessor processor =
+        new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) TestStreamTask::new, listener, jobCoordinator);
+
+    return processor;
+  }
+
+  protected void createTopics(String inputTopic, String outputTopic) {
+    TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties());
+    TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties());
+  }
+
+  protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
+      int messageCount) {
+    Map<String, String> configs = new HashMap<>();
+    configs.putAll(StandaloneTestUtils
+        .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
+    configs.putAll(StandaloneTestUtils
+        .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
+            true));
+    configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
+    configs.put("app.messageCount", String.valueOf(messageCount));
+    configs.put("app.outputTopic", outputTopic);
+    configs.put("app.outputSystem", testSystem);
+    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+
+    configs.put("job.systemstreampartition.grouper.factory",
+        "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
+    configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
+
+    return configs;
+  }
+
+  /**
+   * Produces the provided number of messages to the topic.
+   */
+  protected void produceMessages(final int start, String topic, int numMessages) {
+    KafkaProducer producer = getKafkaProducer();
+    for (int i = start; i < numMessages + start; i++) {
+      try {
+        LOG.info("producing " + i);
+        producer.send(new ProducerRecord(topic, i % 2, String.valueOf(i), String.valueOf(i).getBytes())).get();
+      } catch (InterruptedException | ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
+   * and then stopping it.
+   */
+  protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
+    Thread t = new Thread() {
+
+      @Override
+      public void run() {
+        processor.start();
+        try {
+          // just wait
+          synchronized (this) {
+            this.wait(100000);
+          }
+          LOG.info("notified. Abandon the wait.");
+        } catch (InterruptedException e) {
+          LOG.error("wait interrupted" + e);
+        }
+        LOG.info("Stopping the processor");
+        processor.stop();
+      }
+    };
+    return t;
+  }
+
+  // for sequential values we can generate them automatically
+  protected void verifyNumMessages(String topic, int numberOfSequentialValues, int expectedNumMessages) {
+    // we should get each value one time
+    // create a map of all expected values to validate
+    Map<Integer, Boolean> expectedValues = new HashMap<>(numberOfSequentialValues);
+    for (int i = 0; i < numberOfSequentialValues; i++) {
+      expectedValues.put(i, false);
+    }
+    verifyNumMessages(topic, expectedValues, expectedNumMessages);
+  }
+
+  /**
+   * Consumes data from the topic until there are no new messages for a while
+   * and asserts that the number of consumed messages is as expected.
+   */
+  protected void verifyNumMessages(String topic, final Map<Integer, Boolean> expectedValues, int expectedNumMessages) {
+    KafkaConsumer consumer = getKafkaConsumer();
+    consumer.subscribe(Collections.singletonList(topic));
+
+    Map<Integer, Boolean> map = new HashMap<>(expectedValues);
+    int count = 0;
+    int emptyPollCount = 0;
+
+    while (count < expectedNumMessages && emptyPollCount < 5) {
+      ConsumerRecords records = consumer.poll(5000);
+      if (!records.isEmpty()) {
+        Iterator<ConsumerRecord> iterator = records.iterator();
+        while (iterator.hasNext()) {
+          ConsumerRecord record = iterator.next();
+          String val = new String((byte[]) record.value());
+          LOG.info("Got value " + val + "; count = " + count + "; out of " + expectedNumMessages);
+          Integer valI = Integer.valueOf(val);
+          if (valI < BAD_MESSAGE_KEY) {
+            map.put(valI, true);
+            count++;
+          }
+        }
+      } else {
+        emptyPollCount++;
+        LOG.warn("empty polls " + emptyPollCount);
+      }
+    }
+    // filter out numbers we did not get
+    long numFalse = map.values().stream().filter(v -> !v).count();
+    Assert.assertEquals("didn't get this number of events ", 0, numFalse);
+    Assert.assertEquals(expectedNumMessages, count);
+  }
+
+  protected void waitUntilMessagesLeftN(int untilLeft) {
+    int attempts = ATTEMPTS_NUMBER;
+    while (attempts > 0) {
+      long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount();
+      //System.out.println("2current count = " + leftEventsCount);
+      if (leftEventsCount == untilLeft) { // that much should be left
+        System.out.println("2read all. current count = " + leftEventsCount);
+        break;
+      }
+      TestZkUtils.sleepMs(1000);
+      attempts--;
+    }
+    Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
+  }
+
+  protected void waitForProcessorToStartStop(Object waitObject) {
+    try {
+      synchronized (waitObject) {
+        waitObject.wait(1000);
+      }
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the first processor to start.");
+    }
+  }
+
+  protected void stopProcessor(Thread threadName) {
+    synchronized (threadName) {
+      threadName.notify();
+    }
+  }
+
+  // StreamTaskClass
+  public static class TestStreamTask implements StreamTask, InitableTask {
+    // static field since there's no other way to share state b/w a task instance and
+    // stream processor when constructed from "task.class".
+    public static CountDownLatch endLatch;
+    protected int processedMessageCount = 0;
+    protected String processorId;
+    protected String outputTopic;
+    protected String outputSystem;
+    protected String processorIdToFail;
+
+    @Override
+    public void init(Config config, TaskContext taskContext)
+        throws Exception {
+      this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
+      this.outputTopic = config.get("app.outputTopic", "output");
+      this.outputSystem = config.get("app.outputSystem", "test-system");
+      this.processorIdToFail = config.get("processor.id.to.fail", "1");
+    }
+
+    @Override
+    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
+        TaskCoordinator taskCoordinator)
+        throws Exception {
+
+      Object message = incomingMessageEnvelope.getMessage();
+
+      String key = new String((byte[]) incomingMessageEnvelope.getKey());
+      Integer val = Integer.valueOf((String) message);
+
+      LOG.info("Stream processor " + processorId + ";key=" + key + ";offset=" + incomingMessageEnvelope.getOffset()
+          + "; totalRcvd=" + processedMessageCount + ";val=" + val + "; ssp=" + incomingMessageEnvelope
+          .getSystemStreamPartition());
+
+      // inject a failure
+      if (val >= BAD_MESSAGE_KEY && processorId.equals(processorIdToFail)) {
+        LOG.info("process method failing for msg=" + message);
+        throw new Exception("Processing in the processor " + processorId + " failed ");
+      }
+
+      messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message));
+      processedMessageCount++;
+
+      synchronized (endLatch) {
+        if (Integer.valueOf(key) < BAD_MESSAGE_KEY) {
+          endLatch.countDown();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
index b409532..05b6ebe 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
@@ -19,14 +19,12 @@
 
 package org.apache.samza.test.processor;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.TestZkStreamProcessorBase;
 import org.apache.samza.zk.TestZkUtils;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 
@@ -36,28 +34,9 @@ import org.junit.Test;
  */
 public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
 
-  private AtomicInteger counter = new AtomicInteger(1);
-  private String testSystem;
-  private String inputTopic;
-  private String outputTopic;
-  private int messageCount = 40;
-
-  private Map<String, String> map;
-
-
-  @Before
-  public void setupTest() {
-    // for each tests - make the common parts unique
-    int seqNum = counter.getAndAdd(1);
-    testSystem = "test-system" + seqNum;
-    inputTopic = "numbers" + seqNum;
-    outputTopic = "output" + seqNum;
-
-    map = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
-
-    // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
-    // TopicExistsException since StreamProcessor auto-creates them.
-    createTopics(inputTopic, outputTopic);
+  @Override
+  protected String prefix() {
+    return "test_ZK_";
   }
 
   @Test
@@ -67,40 +46,42 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
 
   @Test
   public void testTwoStreamProcessors() {
-    testStreamProcessor(new String[]{"1", "2"});
+    testStreamProcessor(new String[]{"2", "3"});
   }
 
   @Test
   public void testFiveStreamProcessors() {
-    testStreamProcessor(new String[]{"1", "2", "3", "4", "5"});
+    testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
   }
 
   // main test method for happy path with fixed number of processors
   private void testStreamProcessor(String[] processorIds) {
 
-    // create a latch of the size == number of messages
+    // create a latch of the size equals to the number of messages
     TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
 
-    // initialize the the processors
-    // we need startLatch to know when the processor has been completely initialized
+    // initialize the processors
     StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
-    CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length];
+    // we need to know when the processor has started
+    Object[] startWait = new Object[processorIds.length];
     for (int i = 0; i < processorIds.length; i++) {
-      startCountDownLatches[i] = new CountDownLatch(1);
-      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null);
+      startWait[i] = new Object();
+      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
     }
 
-    // produce messageCount messages, starting with key '0'
+    // produce messageCount messages, starting with key 0
     produceMessages(0, inputTopic, messageCount);
 
-    // run the processors in a separate threads
+    // run the processors in separate threads
     Thread[] threads = new Thread[processorIds.length];
     for (int i = 0; i < processorIds.length; i++) {
       threads[i] = runInThread(streamProcessors[i], TestZkStreamProcessorBase.TestStreamTask.endLatch);
       threads[i].start();
       // wait until the processor reports that it has started
       try {
-        startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS);
+        synchronized (startWait[i]) {
+          startWait[i].wait(1000);
+        }
       } catch (InterruptedException e) {
         Assert.fail("got interrupted while waiting for the " + i + "th processor to start.");
       }
@@ -116,9 +97,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     // collect all the threads
     try {
       for (Thread t : threads) {
-        synchronized (t) {
-          t.notify(); // to stop the thread
-        }
+        stopProcessor(t);
         t.join(1000);
       }
     } catch (InterruptedException e) {
@@ -133,7 +112,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
    * Similar to the previous tests, but add another processor in the middle
    */ public void testStreamProcessorWithAdd() {
 
-    // set number of events we expect wo read by both processes in total:
+    // set number of events we expect to read by both processes in total:
     // p1 - reads 'messageCount' at first
     // p1 and p2 read all messageCount together, since they start from the beginning.
     // so we expect total 3 x messageCounts
@@ -141,8 +120,9 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
 
     // create first processor
-    CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
-    StreamProcessor sp = createStreamProcessor("1", map, startCountDownLatch1, null);
+    Object startWait1 = new Object();
+    Object stopWait1 = new Object();
+    StreamProcessor sp = createStreamProcessor("20", map, startWait1, stopWait1);
 
     // produce first batch of messages starting with 0
     produceMessages(0, inputTopic, messageCount);
@@ -152,46 +132,36 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     t1.start();
 
     // wait until the processor reports that it has started
-    try {
-      startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the first processor to start.");
-    }
+    waitForProcessorToStartStop(startWait1);
 
     // make sure it consumes all the messages from the first batch
-    waitUntilConsumedN(totalEventsToGenerate - messageCount);
+    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
 
     // start the second processor
-    CountDownLatch countDownLatch2 = new CountDownLatch(1);
-    StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+    Object startWait2 = new Object();
+    StreamProcessor sp2 = createStreamProcessor("21", map, startWait2, null);
     Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
     t2.start();
 
-    // wait until the processor reports that it has started
-    try {
-      countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the 2nd processor to start.");
-    }
+    // wait until 2nd processor reports that it has started
+    waitForProcessorToStartStop(startWait2);
+
+    // wait until the 1st processor reports that it has stopped
+    waitForProcessorToStartStop(stopWait1);
 
-    // wait for at least one full debounce time to let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(3000);
+    // let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(600);
 
     // produce the second batch of the messages, starting with 'messageCount'
     produceMessages(messageCount, inputTopic, messageCount);
 
     // wait until all the events are consumed
-    // make sure it consumes all the messages from the first batch
-    waitUntilConsumedN(0);
+    waitUntilMessagesLeftN(0);
 
     // shutdown both
     try {
-      synchronized (t1) {
-        t1.notify();
-      }
-      synchronized (t2) {
-        t2.notify();
-      }
+      stopProcessor(t1);
+      stopProcessor(t2);
       t1.join(1000);
       t2.join(1000);
     } catch (InterruptedException e) {
@@ -216,68 +186,55 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
 
     // create first processor
-    CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
-    CountDownLatch stopCountDownLatch1 = new CountDownLatch(1);
-    StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1);
+    Object waitStart1 = new Object();
+    Object waitStop1 = new Object();
+    StreamProcessor sp1 = createStreamProcessor("30", map, waitStart1, waitStop1);
 
     // start the first processor
     Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
     t1.start();
 
     // start the second processor
-    CountDownLatch countDownLatch2 = new CountDownLatch(1);
-    StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+    Object waitStart2 = new Object();
+    Object waitStop2 = new Object();
+    StreamProcessor sp2 = createStreamProcessor("31", map, waitStart2, waitStop2);
     Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
     t2.start();
 
     // wait until the processor reports that it has started
-    try {
-      startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the first processor to start.");
-    }
+    waitForProcessorToStartStop(waitStart1);
 
     // wait until the processor reports that it has started
-    try {
-      countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the 2nd processor to start.");
-    }
+    waitForProcessorToStartStop(waitStart2);
 
     // produce first batch of messages starting with 0
     produceMessages(0, inputTopic, messageCount);
 
     // make sure they consume all the messages from the first batch
-    waitUntilConsumedN(totalEventsToGenerate - messageCount);
+    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
 
     // stop the first processor
-    synchronized (t1) {
-      t1.notify(); // this should stop it
-    }
+    stopProcessor(t1);
 
     // wait until it's really down
-    try {
-      stopCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
-      System.out.println("Processor 1 is down");
-    } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the 1st processor to stop.");
-    }
+    waitForProcessorToStartStop(waitStop1);
+
+    // processor1 will stop and start again. We wait for its stop to make sure we can count EXACTLY how many messages it reads.
+    waitForProcessorToStartStop(waitStop2);
 
-    // wait for at least one full debounce time to let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(3000);
+    // let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(300);
 
     // produce the second batch of the messages, starting with 'messageCount'
     produceMessages(messageCount, inputTopic, messageCount);
 
     // wait until p2 consumes all the message by itself;
-    waitUntilConsumedN(0);
+    waitUntilMessagesLeftN(0);
 
     // shutdown p2
 
     try {
-      synchronized (t2) {
-        t2.notify();
-      }
+      stopProcessor(t2);
       t2.join(1000);
     } catch (InterruptedException e) {
       Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
deleted file mode 100644
index 9320feb..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.processor;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import kafka.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.ZkConfig;
-import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifecycleListener;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.StandaloneIntegrationTestHarness;
-import org.apache.samza.test.StandaloneTestUtils;
-import org.apache.samza.zk.TestZkUtils;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
-  public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
-  public final static int BAD_MESSAGE_KEY = 1000;
-  // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
-  private final static int ATTEMPTS_NUMBER = 5;
-
-
-
-  // auxiliary methods
-  protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map,
-      final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) {
-    map.put(ApplicationConfig.PROCESSOR_ID, pId);
-
-    StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new,
-        new StreamProcessorLifecycleListener() {
-
-          @Override
-          public void onStart() {
-            if (startLatchCountDown != null) {
-              startLatchCountDown.countDown();
-            }
-          }
-
-          @Override
-          public void onShutdown() {
-            if (stopLatchCountDown != null) {
-              stopLatchCountDown.countDown();
-            }
-            LOG.info("onShutdown is called for pid=" + pId);
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LOG.info("onFailure is called for pid=" + pId);
-          }
-        });
-
-    return processor;
-  }
-
-  protected void createTopics(String inputTopic, String outputTopic) {
-    TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties());
-    TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties());
-  }
-
-  protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
-      int messageCount) {
-    Map<String, String> configs = new HashMap<>();
-    configs.putAll(StandaloneTestUtils
-        .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
-    configs.putAll(StandaloneTestUtils
-        .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
-            true));
-    configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
-    configs.put("app.messageCount", String.valueOf(messageCount));
-    configs.put("app.outputTopic", outputTopic);
-    configs.put("app.outputSystem", testSystem);
-    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
-
-    configs.put("job.systemstreampartition.grouper.factory",
-        "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
-    configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
-
-    return configs;
-  }
-
-  /**
-   * Produces the provided number of messages to the topic.
-   */
-  protected void produceMessages(final int start, String topic, int numMessages) {
-    KafkaProducer producer = getKafkaProducer();
-    for (int i = start; i < numMessages + start; i++) {
-      try {
-        LOG.info("producing " + i);
-        producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get();
-      } catch (InterruptedException | ExecutionException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  /**
-   * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
-   * and then stopping it.
-   */
-  protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
-    Thread t = new Thread() {
-
-      @Override
-      public void run() {
-        processor.start();
-        try {
-          // just wait
-          synchronized (this) {
-            this.wait(100000);
-          }
-          LOG.info("notified. Abandon the wait.");
-        } catch (InterruptedException e) {
-          LOG.error("wait interrupted" + e);
-        }
-        LOG.info("Stopping the processor");
-        processor.stop();
-      }
-    };
-    return t;
-  }
-
-  // for sequential values we can generate them automatically
-  protected void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) {
-    // we should get each value one time
-    // create a map of all expected values to validate
-    Map<Integer, Boolean> expectedValues = new HashMap<>(numberOfSequentialValues);
-    for (int i = 0; i < numberOfSequentialValues; i++) {
-      expectedValues.put(i, false);
-    }
-    verifyNumMessages(topic, expectedValues, exectedNumMessages);
-  }
-
-  /**
-   * Consumes data from the topic until there are no new messages for a while
-   * and asserts that the number of consumed messages is as expected.
-   */
-  protected void verifyNumMessages(String topic, final Map<Integer, Boolean> expectedValues, int expectedNumMessages) {
-    KafkaConsumer consumer = getKafkaConsumer();
-    consumer.subscribe(Collections.singletonList(topic));
-
-    Map<Integer, Boolean> map = new HashMap<>(expectedValues);
-    int count = 0;
-    int emptyPollCount = 0;
-
-    while (count < expectedNumMessages && emptyPollCount < 5) {
-      ConsumerRecords records = consumer.poll(5000);
-      if (!records.isEmpty()) {
-        Iterator<ConsumerRecord> iterator = records.iterator();
-        while (iterator.hasNext()) {
-          ConsumerRecord record = iterator.next();
-          String val = new String((byte[]) record.value());
-          LOG.info("Got value " + val);
-          map.put(Integer.valueOf(val), true);
-          count++;
-        }
-      } else {
-        emptyPollCount++;
-        LOG.warn("empty polls " + emptyPollCount);
-      }
-    }
-    // filter out numbers we did not get
-    long numFalse = map.values().stream().filter(v -> !v).count();
-    Assert.assertEquals("didn't get this number of events ", 0, numFalse);
-    Assert.assertEquals(expectedNumMessages, count);
-  }
-
-  // StreamTaskClass
-  public static class TestStreamTask implements StreamTask, InitableTask {
-    // static field since there's no other way to share state b/w a task instance and
-    // stream processor when constructed from "task.class".
-    static CountDownLatch endLatch;
-    private int processedMessageCount = 0;
-    private String processorId;
-    private String outputTopic;
-    private String outputSystem;
-
-    @Override
-    public void init(Config config, TaskContext taskContext)
-        throws Exception {
-      this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
-      this.outputTopic = config.get("app.outputTopic", "output");
-      this.outputSystem = config.get("app.outputSystem", "test-system");
-    }
-
-    @Override
-    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
-        TaskCoordinator taskCoordinator)
-        throws Exception {
-
-      Object message = incomingMessageEnvelope.getMessage();
-
-      // inject a failure
-      if (Integer.valueOf((String) message) >= BAD_MESSAGE_KEY && processorId.equals("1")) {
-        LOG.info("process method will fail for msg=" + message);
-        throw new Exception("Processing in the processor " + processorId + " failed ");
-      }
-
-      messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message));
-      processedMessageCount++;
-
-      LOG.info("Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd="
-              + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope
-              .getSystemStreamPartition());
-
-      synchronized (endLatch) {
-        endLatch.countDown();
-      }
-    }
-  }
-
-  protected void waitUntilConsumedN(int untilLeft) {
-    int attempts = ATTEMPTS_NUMBER;
-    while (attempts > 0) {
-      long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount();
-      //System.out.println("2current count = " + leftEventsCount);
-      if (leftEventsCount == untilLeft) { // should read all of them
-        //System.out.println("2read all. current count = " + leftEventsCount);
-        break;
-      }
-      TestZkUtils.sleepMs(1000);
-      attempts--;
-    }
-    Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
new file mode 100644
index 0000000..58d92fb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.TestZkStreamProcessorBase;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Failure tests:
+ * ZK unavailable.
+ * One processor fails in process.
+ */
+public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
+
+  private final static int BAD_MESSAGE_KEY = 1000;
+
+  @Override
+  protected String prefix() {
+    return "test_ZK_failure_";
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+  }
+
+  @Test(expected = org.apache.samza.SamzaException.class)
+  public void testZkUnavailable() {
+    map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
+    map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
+    CountDownLatch startLatch = new CountDownLatch(1);
+    createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception
+    Assert.fail("should've thrown an exception");
+  }
+
+  @Test
+  // Test with a single processor failing.
+  // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
+  // throw an exception.
+  public void testFailStreamProcessor() {
+    final int numBadMessages = 4; // either of these bad messages will cause p1 to throw and exception
+    map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "100");
+    map.put("processor.id.to.fail", "101");
+
+    // set number of events we expect to read by both processes in total:
+    // p1 will read messageCount/2 messages
+    // p2 will read messageCount/2 messages
+    // numBadMessages "bad" messages will be generated
+    // p2 will read 2 of the "bad" messages
+    // p1 will fail on the first of the "bad" messages
+    // a new job model will be generated
+    // and p2 will read all 2 * messageCount messages again, + numBadMessages (all of them this time)
+    // total 2 x messageCount / 2 + numBadMessages/2 + 2 * messageCount + numBadMessages
+    int totalEventsToBeConsumed = 3 * messageCount;
+
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToBeConsumed);
+    // create first processor
+    Object waitStart1 = new Object();
+    Object waitStop1 = new Object();
+    StreamProcessor sp1 = createStreamProcessor("101", map, waitStart1, waitStop1);
+    // start the first processor
+    Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
+    t1.start();
+
+    // start the second processor
+    Object waitStart2 = new Object();
+    Object waitStop2 = new Object();
+    StreamProcessor sp2 = createStreamProcessor("102", map, waitStart2, waitStop2);
+    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
+    t2.start();
+
+    // wait until the 1st processor reports that it has started
+    waitForProcessorToStartStop(waitStart1);
+
+    // wait until the 2nd processor reports that it has started
+    waitForProcessorToStartStop(waitStart2);
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // make sure they consume all the messages
+    waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
+
+    // produce the bad messages
+    produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
+
+    waitForProcessorToStartStop(waitStop1);
+
+    // wait until the 2nd processor reports that it has stopped
+    waitForProcessorToStartStop(waitStop2);
+
+    // give some extra time to let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(300);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until p2 consumes all the message by itself
+    waitUntilMessagesLeftN(0);
+
+    // shutdown p2
+    try {
+      stopProcessor(t2);
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    // number of unique values we gonna read is from 0 to (2*messageCount - 1)
+    Map<Integer, Boolean> expectedValues = new HashMap<>(2 * messageCount);
+    for (int i = 0; i < 2 * messageCount; i++) {
+      expectedValues.put(i, false);
+    }
+    for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) {
+      //expectedValues.put(i, false);
+    }
+
+    verifyNumMessages(outputTopic, expectedValues, totalEventsToBeConsumed);
+  }
+}