You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/10 23:35:02 UTC

samza git commit: SAMZA-1730: Adding state valiations in StreamProcessor before any lifecycle operation and group coordination.

Repository: samza
Updated Branches:
  refs/heads/master 08cfad990 -> 93b397e84


SAMZA-1730: Adding state valiations in StreamProcessor before any lifecycle operation and group coordination.

Author: Shanthoosh Venkataraman <sa...@gmail.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #535 from shanthoosh/abced


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93b397e8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93b397e8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93b397e8

Branch: refs/heads/master
Commit: 93b397e84056f311d238be72181b2df60cccb6c0
Parents: 08cfad9
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Tue Jul 10 16:34:58 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Jul 10 16:34:58 2018 -0700

----------------------------------------------------------------------
 .../samza/container/SamzaContainerListener.java |   9 +-
 .../apache/samza/processor/StreamProcessor.java | 281 ++++++++++++-------
 .../samza/runtime/LocalContainerRunner.java     |   2 +-
 .../standalone/PassthroughJobCoordinator.java   |   1 +
 .../apache/samza/container/SamzaContainer.scala |  16 +-
 .../samza/job/local/ThreadJobFactory.scala      |   2 +-
 .../samza/processor/TestStreamProcessor.java    | 162 ++++++++++-
 .../samza/container/TestSamzaContainer.scala    |  10 +-
 .../samza/processor/TestZkStreamProcessor.java  |   4 +-
 .../TestZkStreamProcessorFailures.java          |   4 +-
 .../processor/TestZkStreamProcessorSession.java |   4 +-
 .../processor/TestZkLocalApplicationRunner.java | 145 +++++++---
 12 files changed, 456 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index a9c3b2c..fe8bc66 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -37,19 +37,16 @@ public interface SamzaContainerListener {
    *  <br>
    *  <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
    *  exceptions/errors.
-   * @param pausedByJm boolean indicating why the container was stopped. It should be {@literal true}, iff the container
-   *                    was stopped as a result of an expired {@link org.apache.samza.job.model.JobModel}. Otherwise,
-   *                    it should be {@literal false}
    */
-  void onContainerStop(boolean pausedByJm);
+  void onContainerStop();
 
   /**
    *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has  transitioned to
    *  {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
    *  {@link org.apache.samza.SamzaContainerStatus}
    *  <br>
-   *  <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop(boolean)}.
-   * @param t Throwable that caused the container failure.
+   *  <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}.
+   *  @param t Throwable that caused the container failure.
    */
   void onContainerFailed(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 73f32e7..22550d5 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,6 +19,8 @@
 package org.apache.samza.processor;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -26,7 +28,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -49,10 +50,44 @@ import org.slf4j.LoggerFactory;
 /**
  * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
  * independent process.
+ *
  * <p>
  *
  * <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in
- * multiple threads.
+ * multiple threads. This class is thread safe.
+ *
+ * </p>
+ *
+ * <pre>
+ * A StreamProcessor could be in any one of the following states:
+ * NEW, STARTED, IN_REBALANCE, RUNNING, STOPPING, STOPPED.
+ *
+ * Describes the valid state transitions of the {@link StreamProcessor}.
+ *
+ *
+ *                                                                                                   ────────────────────────────────
+ *                                                                                                  │                               │
+ *                                                                                                  │                               │
+ *                                                                                                  │                               │
+ *                                                                                                  │                               │
+ *     New                                StreamProcessor.start()          Rebalance triggered      V        Receives JobModel      │
+ *  StreamProcessor ──────────▶   NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
+ *   Creation                      │                                 │     by group leader          │     and starts Container      │
+ *                                 │                                 │                              │                               │
+ *                             Stre│amProcessor.stop()           Stre│amProcessor.stop()        Stre│amProcessor.stop()         Stre│amProcessor.stop()
+ *                                 │                                 │                              │                               │
+ *                                 │                                 │                              │                               │
+ *                                 │                                 │                              │                               │
+ *                                 V                                 V                              V                               V
+ *                                  ───────────────────────────▶ STOPPING D──────────────────────────────────────────────────────────
+ *                                                                  │
+ *                                                                  │
+ *                                            After JobCoordinator and SamzaContainer had shutdown.
+ *                                                                  │
+ *                                                                  V
+ *                                                                 STOPPED
+ *
+ * </pre>
  */
 @InterfaceStability.Evolving
 public class StreamProcessor {
@@ -69,31 +104,59 @@ public class StreamProcessor {
   private final ExecutorService executorService;
   private final Object lock = new Object();
 
-  private SamzaContainer container = null;
   private Throwable containerException = null;
   private boolean processorOnStartCalled = false;
 
-  // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
-  // stopped due to re-balancing
-  volatile CountDownLatch jcContainerShutdownLatch;
+  volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);
+
+  /**
+   * Indicates the current status of a {@link StreamProcessor}.
+   */
+  public enum State {
+    STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), IN_REBALANCE("IN_REBALANCE");
+
+    private String strVal;
+
+    State(String strVal) {
+      this.strVal = strVal;
+    }
+
+    @Override
+    public String toString() {
+      return strVal;
+    }
+  }
+
+  /**
+   * @return the current state of StreamProcessor.
+   */
+  public State getState() {
+    return state;
+  }
+
+  @VisibleForTesting
+  State state = State.NEW;
+
+  @VisibleForTesting
+  SamzaContainer container = null;
 
   @VisibleForTesting
   JobCoordinatorListener jobCoordinatorListener = null;
 
   /**
-   * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container
+   * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}.
+   *
    * <p>
-   * JobCoordinator controls how the various StreamProcessor instances belonging to a job coordinate. It is also
-   * responsible generating and updating JobModel.
-   * When StreamProcessor starts, it starts the JobCoordinator and brings up a SamzaContainer based on the JobModel.
-   * SamzaContainer is executed using an ExecutorService.
+   * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService
+   * when it receives new {@link JobModel} from JobCoordinator.
    * <p>
-   * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
    *
-   * @param config                 Instance of config object - contains all configuration required for processing
-   * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
+   * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor.
+   *
+   * @param config                 configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}.
+   * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics.
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
-   * @param processorListener         listener to the StreamProcessor life cycle
+   * @param processorListener      listener to the StreamProcessor life cycle.
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
@@ -101,7 +164,7 @@ public class StreamProcessor {
   }
 
   /**
-   *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
+   * Same as {@link StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter
@@ -114,7 +177,7 @@ public class StreamProcessor {
   }
 
   /* package private */
-  JobCoordinator getJobCoordinator() {
+  private JobCoordinator getJobCoordinator() {
     String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config);
   }
@@ -126,6 +189,7 @@ public class StreamProcessor {
 
   StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
                   StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
+    Preconditions.checkNotNull(processorListener, "ProcessorListener cannot be null.");
     this.taskFactory = taskFactory;
     this.config = config;
     this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
@@ -148,7 +212,14 @@ public class StreamProcessor {
    * </p>
    */
   public void start() {
-    jobCoordinator.start();
+    synchronized (lock) {
+      if (state == State.NEW) {
+        state = State.STARTED;
+        jobCoordinator.start();
+      } else {
+        LOGGER.info("Start is no-op, since the current state is {} and not {}.", state, State.NEW);
+      }
+    }
   }
 
   /**
@@ -156,7 +227,7 @@ public class StreamProcessor {
    * Asynchronously stops the {@link StreamProcessor}'s running components - {@link SamzaContainer}
    * and {@link JobCoordinator}
    * </p>
-   * There are multiple ways in which the StreamProcessor stops:
+   * Here're the ways which can stop the StreamProcessor:
    * <ol>
    *   <li>Caller of StreamProcessor invokes stop()</li>
    *   <li>Samza Container completes processing (eg. bounded input) and shuts down</li>
@@ -168,7 +239,7 @@ public class StreamProcessor {
    * <br>
    * If container is running,
    * <ol>
-   *   <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop(boolean)} will trigger
+   *   <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop()} will trigger
    *   {@link JobCoordinator#stop()}</li>
    *   <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will
    *   trigger {@link JobCoordinator#stop()}</li>
@@ -178,20 +249,22 @@ public class StreamProcessor {
    */
   public void stop() {
     synchronized (lock) {
-      boolean containerShutdownInvoked = false;
-      if (container != null) {
+      if (state != State.STOPPING && state != State.STOPPED) {
+        state = State.STOPPING;
         try {
           LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
-          container.shutdown();
-          containerShutdownInvoked = true;
-        } catch (Exception exception) {
-          LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+          boolean hasContainerShutdown = stopSamzaContainer();
+          if (!hasContainerShutdown) {
+            LOGGER.info("Interrupting the container: {} thread to die.", container);
+            executorService.shutdownNow();
+          }
+        } catch (Throwable throwable) {
+          LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable);
         }
-      }
-
-      if (!containerShutdownInvoked) {
-        LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+        LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId);
         jobCoordinator.stop();
+      } else {
+        LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", state);
       }
     }
   }
@@ -200,44 +273,51 @@ public class StreamProcessor {
     return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
   }
 
-  JobCoordinatorListener createJobCoordinatorListener() {
+  /**
+   * Stops the {@link SamzaContainer}.
+   * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise.
+   */
+  private boolean stopSamzaContainer() {
+    boolean hasContainerShutdown = true;
+    if (container != null) {
+      if (!container.hasStopped()) {
+        try {
+          container.shutdown();
+          LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container);
+          hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+        } catch (IllegalContainerStateException icse) {
+          LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
+        } catch (Exception e) {
+          LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
+          hasContainerShutdown = false;
+        }
+        LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown));
+      } else {
+        LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
+      }
+    }
+    return hasContainerShutdown;
+  }
+
+  private JobCoordinatorListener createJobCoordinatorListener() {
     return new JobCoordinatorListener() {
 
       @Override
       public void onJobModelExpired() {
         synchronized (lock) {
-          if (container != null) {
-            SamzaContainerStatus status = container.getStatus();
-            if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
-              boolean shutdownComplete = false;
-              try {
-                LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container,
-                    processorId);
-                container.pause();
-                shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
-                LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
-              } catch (IllegalContainerStateException icse) {
-                // Ignored since container is not running
-                LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
-                shutdownComplete = true;
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
-              } catch (Exception e) {
-                LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
-              }
-              if (!shutdownComplete) {
-                LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
-                container = null;
-                stop();
-              } else {
-                LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
-              }
+          if (state == State.STARTED || state == State.RUNNING) {
+            state = State.IN_REBALANCE;
+            LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
+            boolean hasContainerShutdown = stopSamzaContainer();
+            if (!hasContainerShutdown) {
+              LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
+              state = State.STOPPING;
+              jobCoordinator.stop();
             } else {
-              LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
+              LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
             }
           } else {
-            LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
+            LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED));
           }
         }
       }
@@ -245,35 +325,42 @@ public class StreamProcessor {
       @Override
       public void onNewJobModel(String processorId, JobModel jobModel) {
         synchronized (lock) {
-          jcContainerShutdownLatch = new CountDownLatch(1);
-          container = createSamzaContainer(processorId, jobModel);
-          container.setContainerListener(new ContainerListener());
-          LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
-          executorService.submit(container::run);
+          if (state == State.IN_REBALANCE) {
+            containerShutdownLatch = new CountDownLatch(1);
+            container = createSamzaContainer(processorId, jobModel);
+            container.setContainerListener(new ContainerListener());
+            LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+            executorService.submit(container);
+          } else {
+            LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
+          }
         }
       }
 
       @Override
       public void onCoordinatorStop() {
-        if (executorService != null) {
+        synchronized (lock) {
           LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
+          stopSamzaContainer();
           executorService.shutdownNow();
+          state = State.STOPPED;
         }
-        if (processorListener != null) {
-          if (containerException != null)
-            processorListener.onFailure(containerException);
-          else
-            processorListener.onShutdown();
-        }
+        if (containerException != null)
+          processorListener.onFailure(containerException);
+        else
+          processorListener.onShutdown();
+
       }
 
       @Override
       public void onCoordinatorFailure(Throwable throwable) {
-        LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
-        stop();
-        if (processorListener != null) {
-          processorListener.onFailure(throwable);
+        synchronized (lock) {
+          LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
+          stopSamzaContainer();
+          executorService.shutdownNow();
+          state = State.STOPPED;
         }
+        processorListener.onFailure(throwable);
       }
     };
   }
@@ -287,46 +374,36 @@ public class StreamProcessor {
 
     @Override
     public void onContainerStart() {
+      LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId);
       if (!processorOnStartCalled) {
-        // processorListener is called on start only the first time the container starts.
-        // It is not called after every re-balance of partitions among the processors
+        processorListener.onStart();
         processorOnStartCalled = true;
-        if (processorListener != null) {
-          processorListener.onStart();
-        }
-      } else {
-        LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
       }
+      state = State.RUNNING;
     }
 
     @Override
-    public void onContainerStop(boolean pauseByJm) {
-      if (pauseByJm) {
-        LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
-        if (jcContainerShutdownLatch != null) {
-          jcContainerShutdownLatch.countDown();
-        }
-      } else {  // sp.stop was called or container stopped by itself
-        LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
-        synchronized (lock) {
-          container = null; // this guarantees that stop() doesn't try to stop container again
-          stop();
+    public void onContainerStop() {
+      containerShutdownLatch.countDown();
+      synchronized (lock) {
+        if (state == State.IN_REBALANCE) {
+          LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
+        } else {
+          LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
+          state = State.STOPPING;
+          jobCoordinator.stop();
         }
       }
     }
 
     @Override
     public void onContainerFailed(Throwable t) {
-      if (jcContainerShutdownLatch != null) {
-        jcContainerShutdownLatch.countDown();
-      } else {
-        LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
-      }
+      containerShutdownLatch.countDown();
       synchronized (lock) {
-        containerException = t;
         LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
-        container = null;
-        stop();
+        state = State.STOPPING;
+        containerException = t;
+        jobCoordinator.stop();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 66176d7..e6e622d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -87,7 +87,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
           }
 
           @Override
-          public void onContainerStop(boolean invokedExternally) {
+          public void onContainerStop() {
             log.info("Container Stopped");
           }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 01ee84e..228617a 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -89,6 +89,7 @@ public class PassthroughJobCoordinator implements JobCoordinator {
     }
     if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
       if (coordinatorListener != null) {
+        coordinatorListener.onJobModelExpired();
         coordinatorListener.onNewJobModel(processorId, jobModel);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index be0fb26..89278ad 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -736,7 +736,6 @@ class SamzaContainer(
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
-  private var paused: Boolean = false
   private var containerListener: SamzaContainerListener = null
 
   def getStatus(): SamzaContainerStatus = status
@@ -747,6 +746,8 @@ class SamzaContainer(
     containerListener = listener
   }
 
+  def hasStopped(): Boolean = status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED
+
   def run {
     try {
       info("Starting container.")
@@ -824,7 +825,7 @@ class SamzaContainer(
     status match {
       case SamzaContainerStatus.STOPPED =>
         if (containerListener != null) {
-          containerListener.onContainerStop(paused)
+          containerListener.onContainerStop()
         }
       case SamzaContainerStatus.FAILED =>
         if (containerListener != null) {
@@ -833,17 +834,6 @@ class SamzaContainer(
     }
   }
 
-  // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the future so that StreamProcessor can pause and
-  // unpause the container when the jobmodel changes.
-  /**
-   * Marks the [[SamzaContainer]] as being paused by the called due to a change in [[JobModel]] and then, asynchronously
-   * shuts down this [[SamzaContainer]]
-   */
-  def pause(): Unit = {
-    paused = true
-    shutdown()
-  }
-
   /**
    * <p>
    *   Asynchronously shuts down this [[SamzaContainer]]

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 029b375..7b83874 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -94,7 +94,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         throw t
       }
 
-      override def onContainerStop(pausedOrNot: Boolean): Unit = {
+      override def onContainerStop(): Unit = {
       }
 
       override def onContainerStart(): Unit = {

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index fc1259c..052aa29 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.processor;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -28,13 +29,13 @@ import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.processor.StreamProcessor.State;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,7 +43,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -57,6 +59,7 @@ public class TestStreamProcessor {
 
   @Before
   public void before() {
+    Mockito.reset();
     processorListenerState = new ConcurrentHashMap<ListenerCallback, Boolean>() {
       {
         put(ListenerCallback.ON_START, false);
@@ -103,12 +106,12 @@ public class TestStreamProcessor {
             return null;
           }).when(mockRunLoop).run();
 
-        doAnswer(invocation ->
+        Mockito.doAnswer(invocation ->
           {
             containerStop.countDown();
             return null;
           }).when(mockRunLoop).shutdown();
-        container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, mock(StreamTask.class));
+        container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, Mockito.mock(StreamTask.class));
       }
       return container;
     }
@@ -166,6 +169,7 @@ public class TestStreamProcessor {
     final Thread jcThread = new Thread(() ->
       {
         try {
+          processor.jobCoordinatorListener.onJobModelExpired();
           processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
           coordinatorStop.await();
           processor.jobCoordinatorListener.onCoordinatorStop();
@@ -215,7 +219,7 @@ public class TestStreamProcessor {
    */
   @Test
   public void testContainerFailureCorrectlyStopsProcessor() throws InterruptedException {
-    JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     Throwable expectedThrowable =  new SamzaException("Failure in Container!");
     AtomicReference<Throwable> actualThrowable = new AtomicReference<>();
     final CountDownLatch runLoopStartedLatch = new CountDownLatch(1);
@@ -271,6 +275,7 @@ public class TestStreamProcessor {
         new Thread(() ->
           {
             try {
+              processor.jobCoordinatorListener.onJobModelExpired();
               processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
               coordinatorStop.await();
               processor.jobCoordinatorListener.onCoordinatorStop();
@@ -296,9 +301,146 @@ public class TestStreamProcessor {
     Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_FAILURE));
   }
 
-  // TODO:
-  // Test multiple start / stop and its ordering
-  // test onNewJobModel
-  // test onJobModelExpiry
-  // test Coordinator failure - correctly shutsdown the streamprocessor
+  @Test
+  public void testStartOperationShouldBeIdempotent() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    Mockito.doNothing().when(mockJobCoordinator).start();
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    Assert.assertEquals(State.NEW, streamProcessor.getState());
+    streamProcessor.start();
+
+    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+    streamProcessor.start();
+
+    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+    Mockito.verify(mockJobCoordinator, Mockito.times(1)).start();
+  }
+
+  @Test
+  public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+    /**
+     * Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED,
+     * onJobModelExpired should move the state to IN_REBALANCE.
+     */
+
+    streamProcessor.start();
+
+    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+    streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+    Assert.assertEquals(State.IN_REBALANCE, streamProcessor.getState());
+
+    /**
+     * When there's initialized SamzaContainer in StreamProcessor and the container shutdown
+     * fails in onJobModelExpired. onJobModelExpired should move StreamProcessor to STOPPING
+     * state and should shutdown JobCoordinator.
+     */
+    Mockito.doNothing().when(mockJobCoordinator).start();
+    Mockito.doNothing().when(mockJobCoordinator).stop();
+    Mockito.doNothing().when(mockSamzaContainer).shutdown();
+    Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+    Mockito.when(mockSamzaContainer.getStatus())
+            .thenReturn(SamzaContainerStatus.STARTED)
+            .thenReturn(SamzaContainerStatus.STOPPED);
+    streamProcessor.container = mockSamzaContainer;
+    streamProcessor.state = State.STARTED;
+
+    streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+    Assert.assertEquals(State.STOPPING, streamProcessor.getState());
+    Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown();
+    Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+
+    // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP.
+    streamProcessor.state = State.IN_REBALANCE;
+
+    streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+    Assert.assertEquals(State.IN_REBALANCE, streamProcessor.state);
+  }
+
+  @Test
+  public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+
+    streamProcessor.container = mockSamzaContainer;
+    streamProcessor.state = State.IN_REBALANCE;
+    Mockito.doNothing().when(mockSamzaContainer).run();
+
+    streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>()));
+
+    Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run();
+  }
+
+  @Test
+  public void testStopShouldBeIdempotent() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+
+    Mockito.doNothing().when(mockJobCoordinator).stop();
+    Mockito.doNothing().when(mockSamzaContainer).shutdown();
+    Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+    Mockito.when(mockSamzaContainer.getStatus())
+           .thenReturn(SamzaContainerStatus.STARTED)
+           .thenReturn(SamzaContainerStatus.STOPPED);
+
+    streamProcessor.state = State.RUNNING;
+
+    streamProcessor.stop();
+
+    Assert.assertEquals(State.STOPPING, streamProcessor.state);
+  }
+
+  @Test
+  public void testCoordinatorFailureShouldStopTheStreamProcessor() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+    Exception failureException = new Exception("dummy exception");
+
+    streamProcessor.container = mockSamzaContainer;
+    streamProcessor.state = State.RUNNING;
+    streamProcessor.jobCoordinatorListener.onCoordinatorFailure(failureException);
+    Mockito.doNothing().when(mockSamzaContainer).shutdown();
+    Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+
+
+    Assert.assertEquals(State.STOPPED, streamProcessor.state);
+    Mockito.verify(lifecycleListener).onFailure(failureException);
+    Mockito.verify(mockSamzaContainer).shutdown();
+  }
+
+  @Test
+  public void testCoordinatorStopShouldStopTheStreamProcessor() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+    streamProcessor.state = State.RUNNING;
+    streamProcessor.jobCoordinatorListener.onCoordinatorStop();
+
+    Assert.assertEquals(State.STOPPED, streamProcessor.state);
+    Mockito.verify(lifecycleListener).onShutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index b27b151..9aca45e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -203,7 +203,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(invokedExternally: Boolean): Unit = {
+      override def onContainerStop(): Unit = {
         onContainerStopCalled = true
       }
 
@@ -284,7 +284,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(invokedExternally: Boolean): Unit = {
+      override def onContainerStop(): Unit = {
         onContainerStopCalled = true
       }
 
@@ -367,7 +367,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(invokedExternally: Boolean): Unit = {
+      override def onContainerStop(): Unit = {
         onContainerStopCalled = true
       }
 
@@ -451,7 +451,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
           onContainerFailedThrowable = t
         }
 
-        override def onContainerStop(invokedExternally: Boolean): Unit = {
+        override def onContainerStop(): Unit = {
           onContainerStopCalled = true
         }
 
@@ -530,7 +530,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
           onContainerFailedThrowable = t
         }
 
-        override def onContainerStop(invokedExternally: Boolean): Unit = {
+        override def onContainerStop(): Unit = {
           onContainerStopCalled = true
         }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
index 7253b29..5c28553 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
@@ -128,7 +128,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
 
     // make sure it consumes all the messages from the first batch
     waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
-    CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
+    CountDownLatch containerStopped1 = sp1.containerShutdownLatch;
 
     // start the second processor
     CountDownLatch startWait2 = new CountDownLatch(1);
@@ -211,7 +211,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
 
     // make sure they consume all the messages from the first batch
     waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
-    CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+    CountDownLatch containerStopped2 = sp2.containerShutdownLatch;
 
     // stop the first processor
     stopProcessor(stopLatch1);

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
index 374e77c..fb9c66b 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
@@ -108,8 +108,8 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
 
     // make sure they consume all the messages
     waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
-    CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
-    CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+    CountDownLatch containerStopped1 = sp1.containerShutdownLatch;
+    CountDownLatch containerStopped2 = sp2.containerShutdownLatch;
 
     // produce the bad messages
     produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
index 40eeaf0..f518c0a 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
@@ -98,10 +98,10 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
     waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
 
     // Get the container stop latch to be able to check when a container is stopped.
-    // New jcContainerShutdownLatch is created after each onNewJobModel,
+    // New containerShutdownLatch is created after each onNewJobModel,
     // so we need to get the current one, before it changed..
     for (int i = 0; i < processorIds.length; i++) {
-      containerStopLatches[i] = streamProcessors[i].jcContainerShutdownLatch;
+      containerStopLatches[i] = streamProcessors[i].containerShutdownLatch;
     }
 
     // expire zk session of one of the processors

http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index ea44052..bfa78a0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback;
-import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
@@ -279,16 +278,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Job model before and after the addition of second stream processor should be the same.
     assertEquals(previousJobModel[0], updatedJobModel);
     assertEquals(new MapConfig(), updatedJobModel.getConfig());
-    // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
-    // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
     assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
-
-    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
-    // localApplicationRunner1.kill(streamApp1);
-    // localApplicationRunner2.kill(streamApp2);
-
-    // localApplicationRunner1.waitForFinish();
-    // localApplicationRunner2.waitForFinish();
+    localApplicationRunner1.kill(streamApp1);
+    localApplicationRunner1.waitForFinish();
+    localApplicationRunner2.kill(streamApp2);
+    localApplicationRunner2.waitForFinish();
+    assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+    assertEquals(localApplicationRunner2.status(streamApp2), ApplicationStatus.UnsuccessfulFinish);
   }
 
   /**
@@ -387,13 +383,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     processedMessagesLatch.await();
 
     assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2));
-
-    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
-    // localApplicationRunner1.kill(streamApp1);
-    // localApplicationRunner2.kill(streamApp2);
-
-    // localApplicationRunner1.waitForFinish();
-    // localApplicationRunner2.waitForFinish();
+    localApplicationRunner1.kill(streamApp1);
+    localApplicationRunner1.waitForFinish();
+    localApplicationRunner2.kill(streamApp2);
+    localApplicationRunner2.waitForFinish();
+    assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
   }
 
   @Test
@@ -439,7 +433,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     applicationRunner1.kill(streamApp1);
     applicationRunner1.waitForFinish();
 
-    // How do you know here that leader has been reelected.
+    assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
 
     kafkaEventsConsumedLatch.await();
     publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -458,12 +452,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
     assertEquals(2, jobModel.getContainers().size());
 
-    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
-    // applicationRunner2.kill(streamApp2);
-    // applicationRunner3.kill(streamApp3);
-
-    // applicationRunner2.waitForFinish();
-    // applicationRunner3.waitForFinish();
+    applicationRunner2.kill(streamApp2);
+    applicationRunner2.waitForFinish();
+    assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+    applicationRunner3.kill(streamApp3);
+    applicationRunner3.waitForFinish();
+    assertEquals(applicationRunner3.status(streamApp2), ApplicationStatus.SuccessfulFinish);
   }
 
   @Test
@@ -501,12 +495,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     try {
       applicationRunner3.run(streamApp3);
     } finally {
-      // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
-      // applicationRunner1.kill(streamApp1);
-      // applicationRunner2.kill(streamApp2);
+      applicationRunner1.kill(streamApp1);
+      applicationRunner2.kill(streamApp2);
 
-      // applicationRunner1.waitForFinish();
-      // applicationRunner2.waitForFinish();
+      applicationRunner1.waitForFinish();
+      applicationRunner2.waitForFinish();
     }
   }
 
@@ -526,15 +519,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
     LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
 
-    List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
-    StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m);
-
     // Create StreamApplication from configuration.
     CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
     StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
 
     // Run stream application.
@@ -551,10 +541,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     applicationRunner1.kill(streamApp1);
     applicationRunner1.waitForFinish();
 
+    assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+
     LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
     processedMessagesLatch1 = new CountDownLatch(1);
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
-    streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+    streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
     applicationRunner4.run(streamApp1);
 
     processedMessagesLatch1.await();
@@ -566,12 +558,85 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
     assertEquals(jobModel.getContainers(), newJobModel.getContainers());
 
-    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
-    // applicationRunner2.kill(streamApp2);
-    // applicationRunner4.kill(streamApp1);
+    applicationRunner2.kill(streamApp2);
+    applicationRunner2.waitForFinish();
+    assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+    applicationRunner4.kill(streamApp1);
+    applicationRunner4.waitForFinish();
+    assertEquals(applicationRunner4.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+  }
+
+  @Test
+  public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContainerShutdownTime() throws Exception {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+    configMap.put(TaskConfig.SHUTDOWN_MS(), "0");
+
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    Config applicationConfig1 = new MapConfig(configMap);
+
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    Config applicationConfig2 = new MapConfig(configMap);
+
+    LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+    LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
+
+    // Create StreamApplication from configuration.
+    CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+
+    applicationRunner1.run(streamApp1);
+    applicationRunner2.run(streamApp2);
+
+    processedMessagesLatch1.await();
+    processedMessagesLatch2.await();
+    kafkaEventsConsumedLatch.await();
+
+    // At this stage, both the processors are running and have drained the kakfa source.
+    // Trigger re-balancing phase, by manually adding a new processor.
+
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+    Config applicationConfig3 = new MapConfig(configMap);
+
+    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
+    CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    // applicationRunner2.waitForFinish();
-    // applicationRunner4.waitForFinish();
+    StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+    applicationRunner3.run(streamApp3);
+
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    processedMessagesLatch3.await();
+
+    /**
+     * If the processing has started in the third stream processor, then other two stream processors should be stopped.
+     */
+    // TODO: This is a bug! Status should be unsuccessful finish.
+    assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+    assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+
+    applicationRunner3.kill(streamApp3);
+    applicationRunner3.waitForFinish();
+    assertEquals(applicationRunner3.status(streamApp3), ApplicationStatus.SuccessfulFinish);
+  }
+
+  private static class TestKafkaEvent implements Serializable {
+
+    // Actual content of the event.
+    private String eventData;
+
+    // Contains Integer value, which is greater than previous message id.
+    private String eventId;
+
+    TestKafkaEvent(String eventId, String eventData) {
+      this.eventId = eventData;
+      this.eventData = eventData;
+    }
   }
 
 }