You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/07/17 22:57:43 UTC

[1/2] samza git commit: SAMZA-1336. Session disconnect propagation.

Repository: samza
Updated Branches:
  refs/heads/master 45931fd5d -> ce777164d


http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
deleted file mode 100644
index 58d92fb..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.processor;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ZkConfig;
-import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.TestZkStreamProcessorBase;
-import org.apache.samza.zk.TestZkUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Failure tests:
- * ZK unavailable.
- * One processor fails in process.
- */
-public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
-
-  private final static int BAD_MESSAGE_KEY = 1000;
-
-  @Override
-  protected String prefix() {
-    return "test_ZK_failure_";
-  }
-
-  @Before
-  public void setUp() {
-    super.setUp();
-  }
-
-  @Test(expected = org.apache.samza.SamzaException.class)
-  public void testZkUnavailable() {
-    map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
-    map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
-    CountDownLatch startLatch = new CountDownLatch(1);
-    createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception
-    Assert.fail("should've thrown an exception");
-  }
-
-  @Test
-  // Test with a single processor failing.
-  // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
-  // throw an exception.
-  public void testFailStreamProcessor() {
-    final int numBadMessages = 4; // either of these bad messages will cause p1 to throw and exception
-    map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "100");
-    map.put("processor.id.to.fail", "101");
-
-    // set number of events we expect to read by both processes in total:
-    // p1 will read messageCount/2 messages
-    // p2 will read messageCount/2 messages
-    // numBadMessages "bad" messages will be generated
-    // p2 will read 2 of the "bad" messages
-    // p1 will fail on the first of the "bad" messages
-    // a new job model will be generated
-    // and p2 will read all 2 * messageCount messages again, + numBadMessages (all of them this time)
-    // total 2 x messageCount / 2 + numBadMessages/2 + 2 * messageCount + numBadMessages
-    int totalEventsToBeConsumed = 3 * messageCount;
-
-    TestStreamTask.endLatch = new CountDownLatch(totalEventsToBeConsumed);
-    // create first processor
-    Object waitStart1 = new Object();
-    Object waitStop1 = new Object();
-    StreamProcessor sp1 = createStreamProcessor("101", map, waitStart1, waitStop1);
-    // start the first processor
-    Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
-    t1.start();
-
-    // start the second processor
-    Object waitStart2 = new Object();
-    Object waitStop2 = new Object();
-    StreamProcessor sp2 = createStreamProcessor("102", map, waitStart2, waitStop2);
-    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
-    t2.start();
-
-    // wait until the 1st processor reports that it has started
-    waitForProcessorToStartStop(waitStart1);
-
-    // wait until the 2nd processor reports that it has started
-    waitForProcessorToStartStop(waitStart2);
-
-    // produce first batch of messages starting with 0
-    produceMessages(0, inputTopic, messageCount);
-
-    // make sure they consume all the messages
-    waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
-
-    // produce the bad messages
-    produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
-
-    waitForProcessorToStartStop(waitStop1);
-
-    // wait until the 2nd processor reports that it has stopped
-    waitForProcessorToStartStop(waitStop2);
-
-    // give some extra time to let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(300);
-
-    // produce the second batch of the messages, starting with 'messageCount'
-    produceMessages(messageCount, inputTopic, messageCount);
-
-    // wait until p2 consumes all the message by itself
-    waitUntilMessagesLeftN(0);
-
-    // shutdown p2
-    try {
-      stopProcessor(t2);
-      t2.join(1000);
-    } catch (InterruptedException e) {
-      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
-    }
-
-    // number of unique values we gonna read is from 0 to (2*messageCount - 1)
-    Map<Integer, Boolean> expectedValues = new HashMap<>(2 * messageCount);
-    for (int i = 0; i < 2 * messageCount; i++) {
-      expectedValues.put(i, false);
-    }
-    for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) {
-      //expectedValues.put(i, false);
-    }
-
-    verifyNumMessages(outputTopic, expectedValues, totalEventsToBeConsumed);
-  }
-}


[2/2] samza git commit: SAMZA-1336. Session disconnect propagation.

Posted by na...@apache.org.
SAMZA-1336. Session disconnect propagation.

If ZK doesn't receive any communication from a zkClient (including heartbeats), it closes the session with the client. It removes all the ephemeral nodes associated with the client. That's why we need to restore all these nodes - need to re-register.
We are using ZkClient library to connect to zookeeper. This library allows us to get notification when the session is closed and when a new session is created. So when the new session is created we reset all session related state and re-register.
One weird feature of the library/zookeeper is that when a new session is established, it is still possible to receive old notifications. To avoid this we introduced generation number which we pass into each callback. And if the generation number has changed when the callback was invoked, we will ignore this callback.

Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>, Shanthoosh Venkataraman <sv...@linkedin.com>

Closes #229 from sborya/SessionFailReregister


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ce777164
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ce777164
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ce777164

Branch: refs/heads/master
Commit: ce777164dfdef1eda8fa6a9a534a1506dfdfaac5
Parents: 45931fd
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon Jul 17 15:57:29 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Jul 17 15:57:29 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |   2 +-
 .../autoscaling/deployer/ConfigManager.java     |   2 +-
 .../org/apache/samza/config/TaskConfigJava.java |   2 +-
 .../grouper/stream/GroupByPartition.java        |   2 +-
 .../apache/samza/processor/StreamProcessor.java |  20 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |  26 +-
 .../org/apache/samza/zk/ZkControllerImpl.java   |  42 +++-
 .../samza/zk/ZkCoordinationServiceFactory.java  |   1 -
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  93 +++++--
 .../org/apache/samza/zk/ZkLeaderElector.java    |  19 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 112 ++++++++-
 .../org/apache/samza/config/JobConfig.scala     |   2 +-
 .../apache/samza/container/SamzaContainer.scala |   4 +-
 .../apache/samza/system/SystemConsumers.scala   |   2 +-
 .../hdfs/reader/TestMultiFileHdfsReader.java    |   6 +-
 .../system/kafka/KafkaSystemConsumer.scala      |   2 +-
 .../storage/kv/KeyValueStorageEngine.scala      |   2 +-
 .../samza/processor/TestZkStreamProcessor.java  | 252 +++++++++++++++++++
 .../processor/TestZkStreamProcessorBase.java    |  75 ++++--
 .../TestZkStreamProcessorFailures.java          | 151 +++++++++++
 .../processor/TestZkStreamProcessorSession.java | 132 ++++++++++
 .../test/processor/TestZkStreamProcessor.java   | 248 ------------------
 .../TestZkStreamProcessorFailures.java          | 147 -----------
 23 files changed, 846 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 7a1b56e..3263856 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -695,7 +695,7 @@
 
                 <tr>
                     <td class="property" id="task-shutdown-ms">task.shutdown.ms</td>
-                    <td class="default">5000</td>
+                    <td class="default">30000</td>
                     <td class="description">
                         This property controls how long the Samza container will wait for an orderly shutdown of task instances.
                     </td>

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index fd1e039..223d1d6 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -178,7 +178,7 @@ public class ConfigManager {
   }
 
   /**
-   * skip all the unread messages up to the time this function is called.
+   * notAValidEvent all the unread messages up to the time this function is called.
    * This method just reads the messages, and it does not react to them or change any configuration of the system.
    */
   private void skipUnreadMessages() {

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index f7b2bcd..fc9f165 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters;
 public class TaskConfigJava extends MapConfig {
   // Task Configs
   private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
-  public static final long DEFAULT_TASK_SHUTDOWN_MS = 5000L;
+  public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
 
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
   public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 3022b72..8e3ef31 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -56,7 +56,7 @@ public class GroupByPartition implements SystemStreamPartitionGrouper {
     Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
 
     for (SystemStreamPartition ssp : ssps) {
-      // skip the broadcast streams if there is any
+      // notAValidEvent the broadcast streams if there is any
       if (broadcastStreams.contains(ssp)) {
         continue;
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 14a14a8..89edd16 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -63,6 +63,7 @@ public class StreamProcessor {
   private final Map<String, MetricsReporter> customMetricsReporter;
   private final Config config;
   private final long taskShutdownMs;
+  private final String processorId;
 
   private ExecutorService executorService;
 
@@ -71,7 +72,7 @@ public class StreamProcessor {
   
   // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
   // stopped due to re-balancing
-  private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1);
+  /* package private */volatile CountDownLatch jcContainerShutdownLatch;
   private volatile boolean processorOnStartCalled = false;
 
   @VisibleForTesting
@@ -110,6 +111,11 @@ public class StreamProcessor {
     this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
   }
 
+  @Override
+  public String toString() {
+    return "Processor:" + processorId;
+  }
+
   /* package private */
   JobCoordinator getJobCoordinator() {
     return Util.
@@ -119,6 +125,11 @@ public class StreamProcessor {
         .getJobCoordinator(config);
   }
 
+  @VisibleForTesting
+  JobCoordinator getCurrentJobCoordinator() {
+    return jobCoordinator;
+  }
+
   StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
                   StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
     this.taskFactory = taskFactory;
@@ -129,6 +140,8 @@ public class StreamProcessor {
     this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
+
+    processorId = this.jobCoordinator.getProcessorId();
   }
 
   /**
@@ -197,6 +210,7 @@ public class StreamProcessor {
   }
 
   JobCoordinatorListener createJobCoordinatorListener() {
+    final String pid = this.toString();
     return new JobCoordinatorListener() {
 
       @Override
@@ -206,9 +220,10 @@ public class StreamProcessor {
           if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
             boolean shutdownComplete = false;
             try {
-              LOGGER.info("Shutting down container in onJobModelExpired.");
+              LOGGER.info("Shutting down container in onJobModelExpired for processor:" + pid);
               container.pause();
               shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+              LOGGER.info("ShutdownComplete=" + shutdownComplete);
             } catch (IllegalContainerStateException icse) {
               // Ignored since container is not running
               LOGGER.info("Container was not running.", icse);
@@ -216,6 +231,7 @@ public class StreamProcessor {
             } catch (InterruptedException e) {
               LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
             }
+            LOGGER.info("Shutting down container done for pid=" + pid + "; complete =" + shutdownComplete);
             if (!shutdownComplete) {
               LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " +
                   "Stopping the processor.");

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index c1343b1..196e431 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -19,8 +19,6 @@
 
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +91,7 @@ public class ZkBarrierForVersionUpgrade {
 
     // subscribe for participant's list changes
     LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
-    zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
+    zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants, zkUtils));
 
     barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version));
   }
@@ -106,7 +104,7 @@ public class ZkBarrierForVersionUpgrade {
    */
   public void join(String version, String participantId) {
     String barrierDonePath = keyBuilder.getBarrierStatePath(version);
-    zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
+    zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version, zkUtils));
 
     // TODO: Handle ZkNodeExistsException - SAMZA-1304
     zkUtils.getZkClient().createPersistent(
@@ -128,17 +126,21 @@ public class ZkBarrierForVersionUpgrade {
    * Listener for changes to the list of participants. It is meant to be subscribed only by the creator of the barrier
    * node. It checks to see when the barrier is ready to be marked as completed.
    */
-  class ZkBarrierChangeHandler implements IZkChildListener {
+  class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener {
     private final String barrierVersion;
     private final List<String> names;
 
-    public ZkBarrierChangeHandler(String barrierVersion, List<String> names) {
+    public ZkBarrierChangeHandler(String barrierVersion, List<String> names, ZkUtils zkUtils) {
+      super(zkUtils, "ZkBarrierChangeHandler");
       this.barrierVersion = barrierVersion;
       this.names = names;
     }
 
     @Override
     public void handleChildChange(String parentPath, List<String> currentChildren) {
+      if (notAValidEvent()) {
+        return;
+      }
       if (currentChildren == null) {
         LOG.info("Got ZkBarrierChangeHandler handleChildChange with null currentChildren");
         return;
@@ -162,11 +164,12 @@ public class ZkBarrierForVersionUpgrade {
    * Barrier state values are either DONE or TIMED_OUT. It only registers to receive on valid state change notification.
    * Once a valid state change notification is received, it will un-subscribe from further notifications.
    */
-  class ZkBarrierReachedHandler implements IZkDataListener {
+  class ZkBarrierReachedHandler extends ZkUtils.GenIZkDataListener {
     private final String barrierStatePath;
     private final String barrierVersion;
 
-    public ZkBarrierReachedHandler(String barrierStatePath, String version) {
+    public ZkBarrierReachedHandler(String barrierStatePath, String version, ZkUtils zkUtils) {
+      super(zkUtils, "ZkBarrierReachedHandler");
       this.barrierStatePath = barrierStatePath;
       this.barrierVersion = version;
     }
@@ -174,6 +177,9 @@ public class ZkBarrierForVersionUpgrade {
     @Override
     public void handleDataChange(String dataPath, Object data) {
       LOG.info("got notification about barrier " + barrierStatePath + "; done=" + data);
+      if (notAValidEvent())
+        return;
+
       zkUtils.unsubscribeDataChanges(barrierStatePath, this);
       barrierListenerOptional.ifPresent(
           zkBarrierListener -> zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data));
@@ -182,7 +188,9 @@ public class ZkBarrierForVersionUpgrade {
     @Override
     public void handleDataDeleted(String dataPath)
         throws Exception {
-      LOG.warn("barrier done got deleted at " + dataPath);
+      LOG.warn("barrier done node got deleted at " + dataPath);
+      if (notAValidEvent())
+        return;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index a8317a8..3af5042 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -19,8 +19,6 @@
 
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.LeaderElector;
 import org.slf4j.Logger;
@@ -49,10 +47,8 @@ public class ZkControllerImpl implements ZkController {
   private void init() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
     zkUtils.makeSurePersistentPathsExists(
-        new String[]{
-            keyBuilder.getProcessorsPath(),
-            keyBuilder.getJobModelVersionPath(),
-            keyBuilder.getJobModelPathPrefix()});
+        new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+            .getJobModelPathPrefix()});
   }
 
   @Override
@@ -62,7 +58,7 @@ public class ZkControllerImpl implements ZkController {
     zkLeaderElector.tryBecomeLeader();
 
     // subscribe to JobModel version updates
-    zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler());
+    zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
   }
 
   @Override
@@ -84,11 +80,16 @@ public class ZkControllerImpl implements ZkController {
 
   @Override
   public void subscribeToProcessorChange() {
-    zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler());
+    zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
   }
 
   // Only by Leader
-  class ProcessorChangeHandler implements IZkChildListener {
+  class ProcessorChangeHandler extends ZkUtils.GenIZkChildListener {
+
+    public ProcessorChangeHandler(ZkUtils zkUtils) {
+      super(zkUtils, "ProcessorChangeHandler");
+    }
+
     /**
      * Called when the children of the given path changed.
      *
@@ -97,7 +98,11 @@ public class ZkControllerImpl implements ZkController {
      * @throws Exception
      */
     @Override
-    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+    public void handleChildChange(String parentPath, List<String> currentChildren)
+        throws Exception {
+      if (notAValidEvent())
+        return;
+
       if (currentChildren == null) {
         // this may happen only in case of exception in ZK. It happens if the zkNode has been deleted.
         // So the notification will pass 'null' as the list of children. Exception should be visible in the logs.
@@ -106,19 +111,27 @@ public class ZkControllerImpl implements ZkController {
         return;
       }
       LOG.info(
-          "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: " + parentPath + "  Current Children: "
-              + currentChildren);
+          "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: " + parentPath +
+              "  Current Children: " + currentChildren);
       zkControllerListener.onProcessorChange(currentChildren);
+
     }
   }
 
-  class ZkJobModelVersionChangeHandler implements IZkDataListener {
+  class ZkJobModelVersionChangeHandler extends ZkUtils.GenIZkDataListener {
+
+    public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) {
+      super(zkUtils, "ZkJobModelVersionChangeHandler");
+    }
     /**
      * Called when there is a change to the data in JobModel version path
      * To the subscribers, it signifies that a new version of JobModel is available.
      */
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
+      if (notAValidEvent())
+        return;
+
       LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data="
           + data);
       zkControllerListener.onNewJobModelAvailable((String) data);
@@ -126,6 +139,9 @@ public class ZkControllerImpl implements ZkController {
 
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
+      if (notAValidEvent())
+        return;
+
       throw new SamzaException("version update path has been deleted!");
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index d0633a8..1dd5ec9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -41,7 +41,6 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
         createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
     ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry());
-
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8ca26c8..298c96e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.samza.zk;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkClient;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
@@ -37,16 +40,18 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -75,26 +80,27 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     this.config = config;
     ZkConfig zkConfig = new ZkConfig(config);
     ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+    ZkClient zkClient = ZkCoordinationServiceFactory
+        .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
+    // setup a listener for a session state change
+    // we are mostly interested in "session closed" and "new session created" events
+    zkClient.subscribeStateChanges(new ZkSessionStateChangedListener());
+
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
     this.zkUtils = new ZkUtils(
         keyBuilder,
-        ZkCoordinationServiceFactory.createZkClient(
-            zkConfig.getZkConnect(),
-            zkConfig.getZkSessionTimeoutMs(),
-            zkConfig.getZkConnectionTimeoutMs()),
+        zkClient,
         zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
 
     this.processorId = createProcessorId(config);
     LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
-    this.barrier =  new ZkBarrierForVersionUpgrade(
-        keyBuilder.getJobModelVersionBarrierPrefix(),
-        zkUtils,
+    this.barrier = new ZkBarrierForVersionUpgrade(keyBuilder.getJobModelVersionBarrierPrefix(), zkUtils,
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
-
   }
 
   @Override
@@ -102,7 +108,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     startMetrics();
     streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
 
-    debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+    debounceTimer = new ScheduleAfterDebounceTime(throwable ->
+      {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
       });
@@ -158,8 +165,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   @Override
   public void onProcessorChange(List<String> processors) {
     LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
-    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        debounceTimeMs, () -> doOnProcessorChange(processors));
+    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs,
+        () -> doOnProcessorChange(processors));
   }
 
   void doOnProcessorChange(List<String> processors) {
@@ -176,7 +183,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
     // Assign the next version of JobModel
-    String currentJMVersion  = zkUtils.getJobModelVersion();
+    String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion;
     if (currentJMVersion == null) {
       nextJMVersion = "1";
@@ -260,8 +267,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
   private JobModel generateNewJobModel(List<String> processors) {
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
-        processors);
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, processors);
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
@@ -270,9 +276,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
       metrics.isLeader.set(true);
       zkController.subscribeToProcessorChange();
-      debounceTimer.scheduleAfterDebounceTime(
-        ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        debounceTimeMs, () -> {
+      debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+        {
           // actual actions to do are the same as onProcessorChange
           doOnProcessorChange(new ArrayList<>());
         });
@@ -281,6 +286,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   class ZkBarrierListenerImpl implements ZkBarrierListener {
     private final String barrierAction = "BarrierAction";
+
     private long startTime = 0;
 
     @Override
@@ -301,10 +307,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       metrics.barrierStateChange.inc();
       metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
-        debounceTimer.scheduleAfterDebounceTime(
-            barrierAction,
-          0,
-          () -> onNewJobModelConfirmed(version));
+        debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version));
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
           // no-op
@@ -323,4 +326,50 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       stop();
     }
   }
+
+  /// listener to handle session expiration
+  class ZkSessionStateChangedListener implements IZkStateListener {
+
+    @Override
+    public void handleStateChanged(Watcher.Event.KeeperState state)
+        throws Exception {
+      if (state == Watcher.Event.KeeperState.Expired) {
+        // if the session has expired it means that all the registration's ephemeral nodes are gone.
+        LOG.warn("Got session expired event for processor=" + processorId);
+
+        // increase generation of the ZK connection. All the callbacks from the previous generation will be ignored.
+        zkUtils.incGeneration();
+
+        if (coordinatorListener != null) {
+          coordinatorListener.onJobModelExpired();
+        }
+        // reset all the values that might have been from the previous session (e.g ephemeral node path)
+        zkUtils.unregister();
+
+      }
+    }
+
+    @Override
+    public void handleNewSession()
+        throws Exception {
+      LOG.info("Got new session created event for processor=" + processorId);
+
+
+      LOG.info("register zk controller for the new session");
+      zkController.register();
+    }
+
+    @Override
+    public void handleSessionEstablishmentError(Throwable error)
+        throws Exception {
+      // this means we cannot connect to zookeeper
+      LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
+      stop();
+    }
+  }
+
+  @VisibleForTesting
+  public ZkUtils getZkUtils() {
+    return zkUtils;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8caa5c6..97430cb 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -61,7 +61,7 @@ public class ZkLeaderElector implements LeaderElector {
     this.zkUtils = zkUtils;
     this.keyBuilder = zkUtils.getKeyBuilder();
     this.hostName = getHostName();
-    this.previousProcessorChangeListener = new PreviousProcessorChangeListener();
+    this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
 
     zkUtils.makeSurePersistentPathsExists(new String[]{keyBuilder.getProcessorsPath()});
   }
@@ -170,16 +170,25 @@ public class ZkLeaderElector implements LeaderElector {
   }
 
   // Only by non-leaders
-  class PreviousProcessorChangeListener implements IZkDataListener {
+  class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener {
+
+    public PreviousProcessorChangeListener(ZkUtils zkUtils) {
+      super(zkUtils, "PreviousProcessorChangeListener");
+    }
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
       LOG.debug("Data change on path: " + dataPath + " Data: " + data);
+      if (notAValidEvent())
+        return;
     }
 
     @Override
-    public void handleDataDeleted(String dataPath) throws Exception {
-      LOG.info(
-          zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
+    public void handleDataDeleted(String dataPath)
+        throws Exception {
+      LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
+      if (notAValidEvent()) {
+        return;
+      }
       tryBecomeLeader();
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 7406cf5..8b6bc52 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -54,6 +55,14 @@ import org.slf4j.LoggerFactory;
  * </p>
  *
  * <p>
+ *  <b>Note on Session disconnect handling:</b>
+ *  After the session has timed out, and restored we may still get some notifications from before (from the old
+ *  session). To avoid this, we add a currentGeneration member, which starts with 0, and is increased each time
+ *  a new session is established. Current value of this member is passed to each Listener when it is created.
+ *  So if the Callback from this Listener comes with an old generation id - we ignore it.
+ * </p>
+ *
+ * <p>
  *   <b>Note on Session Management:</b>
  *   Session management, if needed, should be handled by the caller. This can be done by implementing
  *   {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
@@ -68,24 +77,43 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
+  private final AtomicInteger currentGeneration;
   private final ZkUtilsMetrics metrics;
 
+  public void incGeneration() {
+    currentGeneration.incrementAndGet();
+  }
+
+  public int getGeneration() {
+    return currentGeneration.get();
+  }
+
+
+
   public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.zkClient = zkClient;
     this.metrics = new ZkUtilsMetrics(metricsRegistry);
+    this.currentGeneration = new AtomicInteger(0);
   }
 
   public void connect() throws ZkInterruptedException {
     boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
     if (!isConnected) {
-      metrics.zkConnectionError.inc();
+      if (metrics != null) {
+        metrics.zkConnectionError.inc();
+      }
       throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
     }
   }
 
-  ZkClient getZkClient() {
+  // reset all zk-session specific state
+  public void unregister() {
+    ephemeralPath = null;
+  }
+
+  public ZkClient getZkClient() {
     return zkClient;
   }
 
@@ -192,7 +220,9 @@ public class ZkUtils {
   String readProcessorData(String fullPath) {
     try {
       String data = zkClient.readData(fullPath, false);
-      metrics.reads.inc();
+      if (metrics != null) {
+        metrics.reads.inc();
+      }
       return data;
     } catch (Exception e) {
       throw new SamzaException(String.format("Cannot read ZK node: %s", fullPath), e);
@@ -216,12 +246,11 @@ public class ZkUtils {
     String processorPath = keyBuilder.getProcessorsPath();
     List<String> processorIds = new ArrayList<>(znodeIds.size());
     if (znodeIds.size() > 0) {
-
       for (String child : znodeIds) {
         String fullPath = String.format("%s/%s", processorPath, child);
         processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId());
       }
-
+      Collections.sort(processorIds);
       LOG.info("Found these children - " + znodeIds);
       LOG.info("Found these processorIds - " + processorIds);
     }
@@ -235,12 +264,16 @@ public class ZkUtils {
 
   public void subscribeDataChanges(String path, IZkDataListener dataListener) {
     zkClient.subscribeDataChanges(path, dataListener);
-    metrics.subscriptions.inc();
+    if (metrics != null) {
+      metrics.subscriptions.inc();
+    }
   }
 
   public void subscribeChildChanges(String path, IZkChildListener listener) {
     zkClient.subscribeChildChanges(path, listener);
-    metrics.subscriptions.inc();
+    if (metrics != null) {
+      metrics.subscriptions.inc();
+    }
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
@@ -249,7 +282,9 @@ public class ZkUtils {
 
   public void writeData(String path, Object object) {
     zkClient.writeData(path, object);
-    metrics.writes.inc();
+    if (metrics != null) {
+      metrics.writes.inc();
+    }
   }
 
   public boolean exists(String path) {
@@ -261,13 +296,65 @@ public class ZkUtils {
   }
 
   /**
+   * Generation enforcing zk listener abstract class.
+   * It helps listeners, which extend it, to notAValidEvent old generation events.
+   * We cannot use 'sessionId' for this because it is not available through ZkClient (at leaste without reflection)
+   */
+  public abstract static class GenIZkChildListener implements IZkChildListener {
+    private final int generation;
+    private final ZkUtils zkUtils;
+    private final String listenerName;
+
+    public GenIZkChildListener(ZkUtils zkUtils, String listenerName) {
+      generation = zkUtils.getGeneration();
+      this.zkUtils = zkUtils;
+      this.listenerName = listenerName;
+    }
+
+    protected boolean notAValidEvent() {
+      int curGeneration = zkUtils.getGeneration();
+      if (curGeneration != generation) {
+        LOG.warn("SKIPPING handleDataChanged for " + listenerName +
+            " from wrong generation. current generation=" + curGeneration + "; callback generation= " + generation);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  public abstract static class GenIZkDataListener implements IZkDataListener {
+    private final int generation;
+    private final ZkUtils zkUtils;
+    private final String listenerName;
+
+    public GenIZkDataListener(ZkUtils zkUtils, String listenerName) {
+      generation = zkUtils.getGeneration();
+      this.zkUtils = zkUtils;
+      this.listenerName = listenerName;
+    }
+
+    protected boolean notAValidEvent() {
+      int curGeneration = zkUtils.getGeneration();
+      if (curGeneration != generation) {
+        LOG.warn("SKIPPING handleDataChanged for " + listenerName +
+            " from wrong generation. curGen=" + curGeneration + "; cb gen= " + generation);
+        return true;
+      }
+      return false;
+    }
+
+  }
+
+  /**
     * subscribe for changes of JobModel version
     * @param dataListener describe this
     */
-  public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
+  public void subscribeToJobModelVersionChange(GenIZkDataListener dataListener) {
     LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
-    metrics.subscriptions.inc();
+    if (metrics != null) {
+      metrics.subscriptions.inc();
+    }
   }
 
   /**
@@ -345,11 +432,10 @@ public class ZkUtils {
       LOG.error(msg, e);
       throw new SamzaException(msg, e);
     }
-    LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) +
-        "(actual data version after update = " + stat.getVersion() +    ")");
+    LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
+        "(actual data version after update = " + stat.getVersion() + ")");
   }
 
-
   /**
    * verify that given paths exist in ZK
    * @param paths - paths to verify or create

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 2545194..1b3b893 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -48,7 +48,7 @@ object JobConfig {
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
   val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
   val JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms"
-  val DEFAULT_DEBOUNCE_TIME_MS = 2000
+  val DEFAULT_DEBOUNCE_TIME_MS = 20000
 
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3bf5c95..b830579 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -32,7 +32,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.{ClusterManagerConfig, Config, ShellCommandConfig, StorageConfig}
+import org.apache.samza.config._
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -649,7 +649,7 @@ class SamzaContainer(
   jvm: JvmMetrics = null,
   taskThreadPool: ExecutorService = null) extends Runnable with Logging {
 
-  val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
+  val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 6f0b53a..243716e 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -80,7 +80,7 @@ class SystemConsumers (
 
   /**
    * This parameter is to define how to deal with deserialization failure. If
-   * set to true, the task will skip the messages when deserialization fails.
+   * set to true, the task will notAValidEvent the messages when deserialization fails.
    * If set to false, the task will throw SamzaException and fail the container.
    */
   dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
index 5682a7c..14c7fd1 100644
--- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
@@ -88,7 +88,7 @@ public class TestMultiFileHdfsReader {
     // read until the middle of the second file
     multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
       Arrays.asList(descriptors), offset);
-    multiReader.readNext(); // skip one duplicate event
+    multiReader.readNext(); // notAValidEvent one duplicate event
     for (; index < NUM_EVENTS + NUM_EVENTS / 2; index++) {
       IncomingMessageEnvelope envelope = multiReader.readNext();
       GenericRecord record = (GenericRecord) envelope.getMessage();
@@ -101,7 +101,7 @@ public class TestMultiFileHdfsReader {
     // read the rest of all files
     multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
       Arrays.asList(descriptors), offset);
-    multiReader.readNext(); // skip one duplicate event
+    multiReader.readNext(); // notAValidEvent one duplicate event
     while (multiReader.hasNext()) {
       IncomingMessageEnvelope envelope = multiReader.readNext();
       GenericRecord record = (GenericRecord) envelope.getMessage();
@@ -116,7 +116,7 @@ public class TestMultiFileHdfsReader {
     // reopen with the offset of the last record
     multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
       Arrays.asList(descriptors), offset);
-    multiReader.readNext(); // skip one duplicate event
+    multiReader.readNext(); // notAValidEvent one duplicate event
     Assert.assertFalse(multiReader.hasNext());
     multiReader.close();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index aa13fd8..e033b53 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -202,7 +202,7 @@ private[kafka] class KafkaSystemConsumer(
           // we need to lock.
           this.synchronized {
             // Check if we still need this TopicAndPartition inside the
-            // critical section. If we don't, then skip it.
+            // critical section. If we don't, then notAValidEvent it.
             topicPartitionsAndOffsets.get(head) match {
               case Some(nextOffset) =>
                 getHostPort(topicMetadata(head.topic), head.partition) match {

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 2aac6aa..8385c4a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -96,7 +96,7 @@ class KeyValueStorageEngine[K, V](
 
   /**
    * Restore the contents of this key/value store from the change log,
-   * batching updates to underlying raw store to skip wrapping functions for efficiency.
+   * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency.
    */
   def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
new file mode 100644
index 0000000..1a13825
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Happy path tests.
+ * Start 1, 2, 5 processors and make sure they all consume all the events.
+ */
+public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
+
+  @Override
+  protected String prefix() {
+    return "test_ZK_";
+  }
+
+  @Test
+  public void testSingleStreamProcessor() {
+    testStreamProcessor(new String[]{"1"});
+  }
+
+  @Test
+  public void testTwoStreamProcessors() {
+    testStreamProcessor(new String[]{"2", "3"});
+  }
+
+  @Test
+  public void testFiveStreamProcessors() {
+    testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
+  }
+
+  // main test method for happy path with fixed number of processors
+  private void testStreamProcessor(String[] processorIds) {
+
+    // create a latch of the size equals to the number of messages
+    TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
+
+    // initialize the processors
+    StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
+    // we need to know when the processor has started
+    CountDownLatch[] startWait = new CountDownLatch[processorIds.length];
+    for (int i = 0; i < processorIds.length; i++) {
+      startWait[i] = new CountDownLatch(1);
+      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
+    }
+
+
+    // run the processors in separate threads
+    Thread[] threads = new Thread[processorIds.length];
+    CountDownLatch[] stopLatches = new CountDownLatch[processorIds.length];
+    for (int i = 0; i < processorIds.length; i++) {
+      stopLatches[i] = new CountDownLatch(1);
+      threads[i] = runInThread(streamProcessors[i], stopLatches[i]);
+      threads[i].start();
+    }
+    // wait until the processor reports that it has started
+    for (int i = 0; i < processorIds.length; i++) {
+      waitForProcessorToStartStop(startWait[i]);
+    }
+
+    // produce messageCount messages, starting with key 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // wait until all the events are consumed
+    waitUntilMessagesLeftN(0);
+
+    // collect all the threads
+    try {
+      for (int i = 0; i < threads.length; i++) {
+        stopProcessor(stopLatches[i]);
+        threads[i].join(1000);
+      }
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    verifyNumMessages(outputTopic, messageCount, messageCount);
+  }
+
+  @Test
+  /**
+   * Similar to the previous tests, but add another processor in the middle
+   */ public void testStreamProcessorWithAdd() {
+
+    // set number of events we expect to read by both processes in total:
+    // p1 - reads 'messageCount' at first
+    // p1 and p2 read all messageCount together, since they start from the beginning.
+    // so we expect total 3 x messageCounts
+    int totalEventsToGenerate = 3 * messageCount;
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
+
+    // create first processor
+    CountDownLatch startWait1 = new CountDownLatch(1);
+    CountDownLatch stopWait1 = new CountDownLatch(1);
+    StreamProcessor sp1 = createStreamProcessor("20", map, startWait1, stopWait1);
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // start the first processor
+    CountDownLatch stopLatch1 = new CountDownLatch(1);
+    Thread t1 = runInThread(sp1, stopLatch1);
+    t1.start();
+
+    // wait until the processor reports that it has started
+    waitForProcessorToStartStop(startWait1);
+
+    // make sure it consumes all the messages from the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
+    CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
+
+    // start the second processor
+    CountDownLatch startWait2 = new CountDownLatch(1);
+    StreamProcessor sp2 = createStreamProcessor("21", map, startWait2, null);
+
+    CountDownLatch stopLatch2 = new CountDownLatch(1);
+    Thread t2 = runInThread(sp2, stopLatch2);
+    t2.start();
+
+    // wait until 2nd processor reports that it has started
+    waitForProcessorToStartStop(startWait2);
+
+    // wait until the 1st processor reports that it has stopped its container
+    LOG.info("containerStopped latch = " + containerStopped1);
+    waitForProcessorToStartStop(containerStopped1);
+
+    // let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(600);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until all the events are consumed
+    waitUntilMessagesLeftN(0);
+
+    // shutdown both
+    try {
+      stopProcessor(stopLatch1);
+      stopProcessor(stopLatch2);
+      t1.join(1000);
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage());
+    }
+
+    // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together,
+    // but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time.
+    // total number of events we gonna get is 80+40=120
+    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
+  }
+
+  @Test
+  /**
+   * same as other happy path messages, but with one processor removed in the middle
+   */ public void testStreamProcessorWithRemove() {
+
+    // set number of events we expect to read by both processes in total:
+    // p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated
+    // and p2 will read all of them from the beginning (+ 2 x messageCounts, total 3 x)
+    int totalEventsToGenerate = 3 * messageCount;
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
+
+    // create first processor
+    CountDownLatch waitStart1 = new CountDownLatch(1);
+    CountDownLatch waitStop1 = new CountDownLatch(1);
+    StreamProcessor sp1 = createStreamProcessor("30", map, waitStart1, waitStop1);
+
+    // start the first processor
+    CountDownLatch stopLatch1 = new CountDownLatch(1);
+    Thread t1 = runInThread(sp1, stopLatch1);
+    t1.start();
+
+    // start the second processor
+    CountDownLatch waitStart2 = new CountDownLatch(1);
+    CountDownLatch waitStop2 = new CountDownLatch(1);
+    StreamProcessor sp2 = createStreamProcessor("31", map, waitStart2, waitStop2);
+
+    CountDownLatch stopLatch2 = new CountDownLatch(1);
+    Thread t2 = runInThread(sp2, stopLatch2);
+    t2.start();
+
+    // wait until the processor reports that it has started
+    waitForProcessorToStartStop(waitStart1);
+
+    // wait until the processor reports that it has started
+    waitForProcessorToStartStop(waitStart2);
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // make sure they consume all the messages from the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
+    CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+
+    // stop the first processor
+    stopProcessor(stopLatch1);
+
+    // wait until it's really down
+    waitForProcessorToStartStop(waitStop1);
+
+    // processor2 will stop it container and start again.
+    // We wait for the container's stop to make sure we can count EXACTLY how many messages it reads.
+
+    LOG.info("containerStopped latch = " + containerStopped2);
+    waitForProcessorToStartStop(containerStopped2);
+
+    // let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(300);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until p2 consumes all the message by itself;
+    waitUntilMessagesLeftN(0);
+
+    // shutdown p2
+
+    try {
+      stopProcessor(stopLatch2);
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself,
+    // but the expected values are the same 0-79 - we should get each value one time.
+    // Meanwhile the number of events we gonna get is 40 + 80
+    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index a315083..4cbe252 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.processor;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -26,8 +27,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import kafka.utils.TestUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,6 +57,8 @@ import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.Util;
 import org.apache.samza.zk.TestZkUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -93,8 +99,28 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
     createTopics(inputTopic, outputTopic);
   }
 
-  protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map, final Object mutexStart,
-      final Object mutexStop) {
+  // session expiration simulation
+  // have to use the reflection to get the session id
+  protected void expireSession(ZkClient zkClient) {
+    ZkConnection zkConnection = null;
+    try {
+      Field privateField = ZkClient.class.getDeclaredField("_connection");
+      privateField.setAccessible(true);
+      zkConnection = (ZkConnection) privateField.get(zkClient);
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      Assert.fail(e.toString());
+    }
+
+    ZooKeeper zookeeper = zkConnection.getZookeeper();
+    long sessionId = zookeeper.getSessionId();
+
+    LOG.info("Closing/expiring session:" + sessionId);
+    ZooKeeperServer zkServer = zookeeper().zookeeper();
+    zkServer.closeSession(sessionId);
+  }
+
+  protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map, final CountDownLatch waitStart,
+      final CountDownLatch waitStop) {
     map.put(ApplicationConfig.PROCESSOR_ID, pId);
 
     Config config = new MapConfig(map);
@@ -105,20 +131,16 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
     StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() {
       @Override
       public void onStart() {
-        if (mutexStart != null) {
-          synchronized (mutexStart) {
-            mutexStart.notifyAll();
-          }
+        if (waitStart != null) {
+            waitStart.countDown();
         }
         LOG.info("onStart is called for pid=" + pId);
       }
 
       @Override
       public void onShutdown() {
-        if (mutexStop != null) {
-          synchronized (mutexStart) {
-            mutexStart.notify();
-          }
+        if (waitStop != null) {
+          waitStop.countDown();
         }
         LOG.info("onShutdown is called for pid=" + pId);
       }
@@ -144,7 +166,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
       int messageCount) {
     Map<String, String> configs = new HashMap<>();
     configs.putAll(StandaloneTestUtils
-        .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
+        .getStandaloneConfigs("test-job", "org.apache.samza.processor.TestZkStreamProcessor.TestStreamTask"));
     configs.putAll(StandaloneTestUtils
         .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
             true));
@@ -182,23 +204,27 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
    * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
    * and then stopping it.
    */
-  protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
+  protected Thread runInThread(final StreamProcessor processor, CountDownLatch stopStartLatch) {
     Thread t = new Thread() {
 
       @Override
       public void run() {
+        LOG.info("about to start processor " + processor);
         processor.start();
+        LOG.info("started processor " + processor);
         try {
           // just wait
-          synchronized (this) {
-            this.wait(100000);
+          if (!stopStartLatch.await(1000000, TimeUnit.MILLISECONDS)) {
+            LOG.warn("Wait timed out for processor " + processor);
+            Assert.fail("Wait timed out for processor " + processor);
           }
-          LOG.info("notified. Abandon the wait.");
+          LOG.info("notified. Abandon the wait for processor " + processor);
         } catch (InterruptedException e) {
           LOG.error("wait interrupted" + e);
         }
-        LOG.info("Stopping the processor");
+        LOG.info("Stopping the processor" + processor);
         processor.stop();
+        LOG.info("Stopped the processor" + processor);
       }
     };
     return t;
@@ -261,26 +287,25 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
         System.out.println("2read all. current count = " + leftEventsCount);
         break;
       }
-      TestZkUtils.sleepMs(1000);
+      TestZkUtils.sleepMs(3000);
       attempts--;
     }
     Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
   }
 
-  protected void waitForProcessorToStartStop(Object waitObject) {
+  protected void waitForProcessorToStartStop(CountDownLatch waitObject) {
+    LOG.info("Waiting on " + waitObject);
     try {
-      synchronized (waitObject) {
-        waitObject.wait(1000);
+      if (!waitObject.await(30000, TimeUnit.MILLISECONDS)) {
+        Assert.fail("Timed out while waiting for the processor to start/stop.");
       }
     } catch (InterruptedException e) {
-      Assert.fail("got interrupted while waiting for the first processor to start.");
+      Assert.fail("Got interrupted while waiting for the processor to start/stop.");
     }
   }
 
-  protected void stopProcessor(Thread threadName) {
-    synchronized (threadName) {
-      threadName.notify();
-    }
+  protected void stopProcessor(CountDownLatch stopLatch) {
+    stopLatch.countDown();
   }
 
   // StreamTaskClass

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
new file mode 100644
index 0000000..374e77c
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Failure tests:
+ * ZK unavailable.
+ * One processor fails in process.
+ */
+public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
+
+  private final static int BAD_MESSAGE_KEY = 1000;
+
+  @Override
+  protected String prefix() {
+    return "test_ZK_failure_";
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+  }
+
+  @Test(expected = org.apache.samza.SamzaException.class)
+  public void testZkUnavailable() {
+    map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
+    map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
+    CountDownLatch startLatch = new CountDownLatch(1);
+    createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception
+    Assert.fail("should've thrown an exception");
+  }
+
+  @Test
+  // Test with a single processor failing.
+  // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
+  // throw an exception.
+  public void testFailStreamProcessor() {
+    final int numBadMessages = 4; // either of these bad messages will cause p1 to throw and exception
+    map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "100");
+    map.put("processor.id.to.fail", "101");
+
+    // set number of events we expect to read by both processes in total:
+    // p1 will read messageCount/2 messages
+    // p2 will read messageCount/2 messages
+    // numBadMessages "bad" messages will be generated
+    // p2 will read 2 of the "bad" messages
+    // p1 will fail on the first of the "bad" messages
+    // a new job model will be generated
+    // and p2 will read all 2 * messageCount messages again, + numBadMessages (all of them this time)
+    // total 2 x messageCount / 2 + numBadMessages/2 + 2 * messageCount + numBadMessages
+    int totalEventsToBeConsumed = 3 * messageCount;
+
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToBeConsumed);
+    // create first processor
+    CountDownLatch waitStart1 = new CountDownLatch(1);
+    CountDownLatch waitStop1 = new CountDownLatch(1);
+    StreamProcessor sp1 = createStreamProcessor("101", map, waitStart1, waitStop1);
+    // start the first processor
+    CountDownLatch stopLatch1 = new CountDownLatch(1);
+    Thread t1 = runInThread(sp1, stopLatch1);
+    t1.start();
+
+    // start the second processor
+    CountDownLatch waitStart2 = new CountDownLatch(1);
+    CountDownLatch waitStop2 = new CountDownLatch(1);
+    StreamProcessor sp2 = createStreamProcessor("102", map, waitStart2, waitStop2);
+
+    CountDownLatch stopLatch2 = new CountDownLatch(1);
+    Thread t2 = runInThread(sp2, stopLatch2);
+    t2.start();
+
+    // wait until the 1st processor reports that it has started
+    waitForProcessorToStartStop(waitStart1);
+
+    // wait until the 2nd processor reports that it has started
+    waitForProcessorToStartStop(waitStart2);
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // make sure they consume all the messages
+    waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
+    CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
+    CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+
+    // produce the bad messages
+    produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
+
+    waitForProcessorToStartStop(
+        containerStopped1); // TODO: after container failure propagates to StreamProcessor change back
+
+    // wait until the 2nd processor reports that it has stopped its container
+    waitForProcessorToStartStop(containerStopped2);
+
+    // give some extra time to let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(300);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until p2 consumes all the message by itself
+    waitUntilMessagesLeftN(0);
+
+    // shutdown p2
+    try {
+      stopProcessor(stopLatch2);
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    // number of unique values we gonna read is from 0 to (2*messageCount - 1)
+    Map<Integer, Boolean> expectedValues = new HashMap<>(2 * messageCount);
+    for (int i = 0; i < 2 * messageCount; i++) {
+      expectedValues.put(i, false);
+    }
+    for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) {
+      //expectedValues.put(i, false);
+    }
+
+    verifyNumMessages(outputTopic, expectedValues, totalEventsToBeConsumed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
new file mode 100644
index 0000000..10b08d9
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.zk.ZkJobCoordinator;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Happy path tests.
+ * Start 1, 2, 5 processors and make sure they all consume all the events.
+ */
+public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
+
+  @Override
+  protected String prefix() {
+    return "test_ZKS_";
+  }
+
+  @Test
+  public void testSingleStreamProcessor() {
+    testStreamProcessorWithSessionRestart(new String[]{"1"});
+  }
+
+  @Test
+  public void testTwoStreamProcessors() {
+    testStreamProcessorWithSessionRestart(new String[]{"2", "3"});
+  }
+
+  @Test
+  public void testFiveStreamProcessors() {
+    testStreamProcessorWithSessionRestart(new String[]{"4", "5", "6", "7", "8"});
+  }
+
+  private void testStreamProcessorWithSessionRestart(String[] processorIds) {
+
+    // set shorter session expiration for the test
+    map.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, "500");
+    map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "3000");
+
+    // create a latch of the size equals to the number of messages
+    int totalEventsToGenerate = 3 * messageCount;
+    TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
+
+    // initialize the processors
+    StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
+    ZkJobCoordinator[] jobCoordinators = new ZkJobCoordinator[processorIds.length];
+    Thread[] threads = new Thread[processorIds.length];
+    CountDownLatch[] threadStopLatches = new CountDownLatch[processorIds.length];
+    CountDownLatch[] containerStopLatches = new CountDownLatch[processorIds.length];
+    // we need to know when the processor has started
+    CountDownLatch[] startWait = new CountDownLatch[processorIds.length];
+    //CountDownLatch[] stopWait = new CountDownLatch[processorIds.length];
+
+    for (int i = 0; i < processorIds.length; i++) {
+      startWait[i] = new CountDownLatch(1);
+      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
+      jobCoordinators[i] = (ZkJobCoordinator) streamProcessors[i].getCurrentJobCoordinator();
+    }
+
+    // run the processors in separate threads
+    for (int i = 0; i < processorIds.length; i++) {
+      threadStopLatches[i] = new CountDownLatch(1); // is used int stopProcessor
+      threads[i] = runInThread(streamProcessors[i], threadStopLatches[i]);
+      threads[i].start();
+    }
+
+    for (int i = 0; i < processorIds.length; i++) {
+      // wait until the processor reports that it has started
+      waitForProcessorToStartStop(startWait[i]);
+    }
+
+    // produce messageCount messages, starting with key 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // make sure it consumes all the messages from the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
+
+    // Get the container stop latch to be able to check when a container is stopped.
+    // New jcContainerShutdownLatch is created after each onNewJobModel,
+    // so we need to get the current one, before it changed..
+    for (int i = 0; i < processorIds.length; i++) {
+      containerStopLatches[i] = streamProcessors[i].jcContainerShutdownLatch;
+    }
+
+    // expire zk session of one of the processors
+    expireSession(jobCoordinators[0].getZkUtils().getZkClient());
+
+    // wait until all other processors report that they have stopped their containers
+    for (int i = 0; i < processorIds.length; i++) {
+      waitForProcessorToStartStop(containerStopLatches[i]);
+    }
+
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    waitUntilMessagesLeftN(0);
+
+    // collect all the threads
+    try {
+      for (int i = 0; i < threads.length; i++) {
+        stopProcessor(threadStopLatches[i]);
+        threads[i].join(1000);
+      }
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
deleted file mode 100644
index 05b6ebe..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.processor;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.TestZkStreamProcessorBase;
-import org.apache.samza.zk.TestZkUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-/**
- * Happy path tests.
- * Start 1, 2, 5 processors and make sure they all consume all the events.
- */
-public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
-
-  @Override
-  protected String prefix() {
-    return "test_ZK_";
-  }
-
-  @Test
-  public void testSingleStreamProcessor() {
-    testStreamProcessor(new String[]{"1"});
-  }
-
-  @Test
-  public void testTwoStreamProcessors() {
-    testStreamProcessor(new String[]{"2", "3"});
-  }
-
-  @Test
-  public void testFiveStreamProcessors() {
-    testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
-  }
-
-  // main test method for happy path with fixed number of processors
-  private void testStreamProcessor(String[] processorIds) {
-
-    // create a latch of the size equals to the number of messages
-    TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
-
-    // initialize the processors
-    StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
-    // we need to know when the processor has started
-    Object[] startWait = new Object[processorIds.length];
-    for (int i = 0; i < processorIds.length; i++) {
-      startWait[i] = new Object();
-      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
-    }
-
-    // produce messageCount messages, starting with key 0
-    produceMessages(0, inputTopic, messageCount);
-
-    // run the processors in separate threads
-    Thread[] threads = new Thread[processorIds.length];
-    for (int i = 0; i < processorIds.length; i++) {
-      threads[i] = runInThread(streamProcessors[i], TestZkStreamProcessorBase.TestStreamTask.endLatch);
-      threads[i].start();
-      // wait until the processor reports that it has started
-      try {
-        synchronized (startWait[i]) {
-          startWait[i].wait(1000);
-        }
-      } catch (InterruptedException e) {
-        Assert.fail("got interrupted while waiting for the " + i + "th processor to start.");
-      }
-    }
-
-    // wait until all the events are consumed
-    try {
-      TestZkStreamProcessorBase.TestStreamTask.endLatch.await(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage());
-    }
-
-    // collect all the threads
-    try {
-      for (Thread t : threads) {
-        stopProcessor(t);
-        t.join(1000);
-      }
-    } catch (InterruptedException e) {
-      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
-    }
-
-    verifyNumMessages(outputTopic, messageCount, messageCount);
-  }
-
-  @Test
-  /**
-   * Similar to the previous tests, but add another processor in the middle
-   */ public void testStreamProcessorWithAdd() {
-
-    // set number of events we expect to read by both processes in total:
-    // p1 - reads 'messageCount' at first
-    // p1 and p2 read all messageCount together, since they start from the beginning.
-    // so we expect total 3 x messageCounts
-    int totalEventsToGenerate = 3 * messageCount;
-    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
-
-    // create first processor
-    Object startWait1 = new Object();
-    Object stopWait1 = new Object();
-    StreamProcessor sp = createStreamProcessor("20", map, startWait1, stopWait1);
-
-    // produce first batch of messages starting with 0
-    produceMessages(0, inputTopic, messageCount);
-
-    // start the first processor
-    Thread t1 = runInThread(sp, TestStreamTask.endLatch);
-    t1.start();
-
-    // wait until the processor reports that it has started
-    waitForProcessorToStartStop(startWait1);
-
-    // make sure it consumes all the messages from the first batch
-    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
-
-    // start the second processor
-    Object startWait2 = new Object();
-    StreamProcessor sp2 = createStreamProcessor("21", map, startWait2, null);
-    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
-    t2.start();
-
-    // wait until 2nd processor reports that it has started
-    waitForProcessorToStartStop(startWait2);
-
-    // wait until the 1st processor reports that it has stopped
-    waitForProcessorToStartStop(stopWait1);
-
-    // let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(600);
-
-    // produce the second batch of the messages, starting with 'messageCount'
-    produceMessages(messageCount, inputTopic, messageCount);
-
-    // wait until all the events are consumed
-    waitUntilMessagesLeftN(0);
-
-    // shutdown both
-    try {
-      stopProcessor(t1);
-      stopProcessor(t2);
-      t1.join(1000);
-      t2.join(1000);
-    } catch (InterruptedException e) {
-      Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage());
-    }
-
-    // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together,
-    // but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time.
-    // total number of events we gonna get is 80+40=120
-    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
-  }
-
-  @Test
-  /**
-   * same as other happy path messages, but with one processor removed in the middle
-   */ public void testStreamProcessorWithRemove() {
-
-    // set number of events we expect to read by both processes in total:
-    // p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated
-    // and p2 will read all of them from the beginning (+ 2 x messageCounts, total 3 x)
-    int totalEventsToGenerate = 3 * messageCount;
-    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
-
-    // create first processor
-    Object waitStart1 = new Object();
-    Object waitStop1 = new Object();
-    StreamProcessor sp1 = createStreamProcessor("30", map, waitStart1, waitStop1);
-
-    // start the first processor
-    Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
-    t1.start();
-
-    // start the second processor
-    Object waitStart2 = new Object();
-    Object waitStop2 = new Object();
-    StreamProcessor sp2 = createStreamProcessor("31", map, waitStart2, waitStop2);
-    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
-    t2.start();
-
-    // wait until the processor reports that it has started
-    waitForProcessorToStartStop(waitStart1);
-
-    // wait until the processor reports that it has started
-    waitForProcessorToStartStop(waitStart2);
-
-    // produce first batch of messages starting with 0
-    produceMessages(0, inputTopic, messageCount);
-
-    // make sure they consume all the messages from the first batch
-    waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
-
-    // stop the first processor
-    stopProcessor(t1);
-
-    // wait until it's really down
-    waitForProcessorToStartStop(waitStop1);
-
-    // processor1 will stop and start again. We wait for its stop to make sure we can count EXACTLY how many messages it reads.
-    waitForProcessorToStartStop(waitStop2);
-
-    // let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(300);
-
-    // produce the second batch of the messages, starting with 'messageCount'
-    produceMessages(messageCount, inputTopic, messageCount);
-
-    // wait until p2 consumes all the message by itself;
-    waitUntilMessagesLeftN(0);
-
-    // shutdown p2
-
-    try {
-      stopProcessor(t2);
-      t2.join(1000);
-    } catch (InterruptedException e) {
-      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
-    }
-
-    // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself,
-    // but the expected values are the same 0-79 - we should get each value one time.
-    // Meanwhile the number of events we gonna get is 40 + 80
-    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
-  }
-}