You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/11/14 20:45:32 UTC

samza git commit: SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes…

Repository: samza
Updated Branches:
  refs/heads/master 3750f5e24 -> 6fcf7f3f4


SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes…

… in input topic partitions

Some high-lights of the changes:
- always instantiating StreamPartitionCountMonitor on all input system streams now
-- it is debatable whether we want to include systems that do not implement the optimized ExtendedSystemAdmin interface. We may need to configure a long partition monitor interval for this case and the case where there are tons of input topics. (Pending perf test)
- moved the instantiation of StreamPartitionCountMonitor out of JobModelManager and allow ClusterBasedJobCoordinator associate a callback method directly to the monitor
- allow callbacks to set different application status code before throwing exception to shutdown the job

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>

Closes #351 from nickpan47/restart-on-partition-change and squashes the following commits:

8d04cd6 [Yi Pan (Data Infrastructure)] SAMZA-1482: restart or fail the job when input topic partition count changes
ee3fa65 [Yi Pan (Data Infrastructure)] SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes in input topic partitions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6fcf7f3f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6fcf7f3f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6fcf7f3f

Branch: refs/heads/master
Commit: 6fcf7f3f4cbf8b9c6f69b292e3c1aaa239ab18d3
Parents: 3750f5e
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Nov 14 12:45:23 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 14 12:45:23 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/PartitionChangeException.java  |  31 ++++
 .../ClusterBasedJobCoordinator.java             | 108 +++++++++--
 .../clustermanager/ContainerProcessManager.java |  26 ++-
 .../clustermanager/SamzaApplicationState.java   |   7 +-
 .../StreamPartitionCountMonitor.java            | 116 +++++++-----
 .../org/apache/samza/config/JobConfig.scala     |   2 -
 .../org/apache/samza/config/StorageConfig.scala |   8 +
 .../samza/coordinator/JobModelManager.scala     |  49 ++---
 .../main/scala/org/apache/samza/util/Util.scala |   1 -
 .../MockClusterResourceManagerFactory.java      |  32 ++++
 .../clustermanager/MockContainerListener.java   |   1 +
 .../TestClusterBasedJobCoordinator.java         | 108 +++++++++++
 .../clustermanager/TestContainerAllocator.java  |  12 +-
 .../TestContainerProcessManager.java            |  22 +--
 .../TestHostAwareContainerAllocator.java        |  16 +-
 .../coordinator/JobModelManagerTestUtil.java    |   4 +-
 .../samza/coordinator/TestJobModelManager.java  |  24 +--
 .../samza/storage/MockSystemConsumer.java       |  59 ------
 .../apache/samza/storage/MockSystemFactory.java |  45 -----
 .../samza/storage/TestStorageRecovery.java      |  37 +---
 .../apache/samza/system/MockSystemFactory.java  | 181 +++++++++++++++++++
 .../samza/coordinator/TestJobCoordinator.scala  |  65 +------
 .../TestStreamPartitionCountMonitor.scala       |  82 ++++++++-
 .../TestRangeSystemStreamPartitionMatcher.scala |   1 -
 .../TestRegexSystemStreamPartitionMatcher.scala |   1 -
 25 files changed, 693 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
new file mode 100644
index 0000000..4619dfa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+
+/**
+  * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
+  */
+public class PartitionChangeException extends SamzaException {
+
+  public PartitionChangeException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index d0d4e34..3d67cae 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -18,21 +18,36 @@
  */
 package org.apache.samza.clustermanager;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
 import org.apache.samza.SamzaException;
+import org.apache.samza.PartitionChangeException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 /**
  * Implements a JobCoordinator that is completely independent of the underlying cluster
  * manager system. This {@link ClusterBasedJobCoordinator} handles functionality common
@@ -69,10 +84,6 @@ public class ClusterBasedJobCoordinator {
    */
   private final SamzaApplicationState state;
 
-  /**
-   * Metrics to track stats around container failures, needed containers etc.
-   */
-
   //even though some of these can be converted to local variables, it will not be the case
   //as we add more methods to the JobCoordinator and completely implement SAMZA-881.
 
@@ -101,8 +112,30 @@ public class ClusterBasedJobCoordinator {
    */
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+  /**
+   * A boolean variable indicating whether the job has durable state stores in the configuration
+   */
+  private final boolean hasDurableStores;
+
+  /**
+   * The input topic partition count monitor
+   */
+  private final StreamPartitionCountMonitor partitionMonitor;
+
+  /**
+   * Metrics to track stats around container failures, needed containers etc.
+   */
+  private final MetricsRegistryMap metrics;
+
+  /**
+   * Internal variable for the instance of {@link JmxServer}
+   */
   private JmxServer jmxServer;
 
+  /**
+   * Variable to keep the callback exception
+   */
+  volatile private Exception coordinatorException = null;
 
   /**
    * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually
@@ -113,22 +146,23 @@ public class ClusterBasedJobCoordinator {
    */
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
 
-    MetricsRegistryMap registry = new MetricsRegistryMap();
+    metrics = new MetricsRegistryMap();
 
     //build a JobModelReader and perform partition assignments.
-    jobModelManager = buildJobModelManager(coordinatorSystemConfig, registry);
+    jobModelManager = buildJobModelManager(coordinatorSystemConfig, metrics);
     config = jobModelManager.jobModel().getConfig();
+    hasDurableStores = new StorageConfig(config).hasDurableStores();
     state = new SamzaApplicationState(jobModelManager);
+    partitionMonitor = getPartitionCountMonitor(config);
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabled();
 
     jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();
 
     // build a container process Manager
-    containerProcessManager = new ContainerProcessManager(config, state, registry);
+    containerProcessManager = new ContainerProcessManager(config, state, metrics);
   }
 
-
   /**
    * Starts the JobCoordinator.
    *
@@ -152,10 +186,11 @@ public class ClusterBasedJobCoordinator {
       log.info("Starting Cluster Based Job Coordinator");
 
       containerProcessManager.start();
+      partitionMonitor.start();
 
       boolean isInterrupted = false;
 
-      while (!containerProcessManager.shouldShutdown() && !isInterrupted) {
+      while (!containerProcessManager.shouldShutdown() && !checkAndThrowException() && !isInterrupted) {
         try {
           Thread.sleep(jobCoordinatorSleepInterval);
         } catch (InterruptedException e) {
@@ -172,19 +207,25 @@ public class ClusterBasedJobCoordinator {
     }
   }
 
+  private boolean checkAndThrowException() throws Exception {
+    if (coordinatorException != null) {
+      throw coordinatorException;
+    }
+    return false;
+  }
+
   /**
    * Stops all components of the JobCoordinator.
    */
   private void onShutDown() {
 
-    if (containerProcessManager != null) {
-      try {
-        containerProcessManager.stop();
-      } catch (Throwable e) {
-        log.error("Exception while stopping task manager {}", e);
-      }
-      log.info("Stopped task manager");
+    try {
+      partitionMonitor.stop();
+      containerProcessManager.stop();
+    } catch (Throwable e) {
+      log.error("Exception while stopping task manager {}", e);
     }
+    log.info("Stopped task manager");
 
     if (jmxServer != null) {
       try {
@@ -201,6 +242,41 @@ public class ClusterBasedJobCoordinator {
     return jobModelManager;
   }
 
+  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
+    Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
+    Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+    if (inputStreamsToMonitor.isEmpty()) {
+      throw new SamzaException("Input streams to a job can not be empty.");
+    }
+
+    return new StreamPartitionCountMonitor(
+        inputStreamsToMonitor,
+        streamMetadata,
+        metrics,
+        new JobConfig(config).getMonitorPartitionChangeFrequency(),
+        streamsChanged -> {
+        // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
+        if (hasDurableStores) {
+          log.error("Input topic partition count changed in a job with durable state. Failing the job.");
+          state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+        }
+        coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
+      });
+  }
+
+  // The following two methods are package-private and for testing only
+  @VisibleForTesting
+  SamzaApplicationState.SamzaAppStatus getAppStatus() {
+    // make sure to only return a unmodifiable copy of the status variable
+    final SamzaApplicationState.SamzaAppStatus copy = state.status;
+    return copy;
+  }
+
+  @VisibleForTesting
+  StreamPartitionCountMonitor getPartitionMonitor() {
+    return partitionMonitor;
+  }
 
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 2861e9e..6a18b84 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
 /**
  * ContainerProcessManager is responsible for requesting containers, handling failures, and notifying the application master that the
  * job is done.
@@ -85,9 +88,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    */
   private volatile boolean tooManyFailedContainers = false;
 
+  /**
+   * Exception thrown in callbacks, such as {@code containerAllocator}
+   */
   private volatile Throwable exceptionOccurred = null;
 
-
   /**
    * A map that keeps track of how many times each container failed. The key is the container ID, and the
    * value is the {@link ResourceFailure} object that has a count of failures.
@@ -95,9 +100,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    */
   private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
 
+  /**
+   * Metrics for {@link ContainerProcessManager}
+   */
   private final ContainerProcessManagerMetrics metrics;
 
-
   public ContainerProcessManager(Config config,
                                  SamzaApplicationState state,
                                  MetricsRegistryMap registry) {
@@ -108,7 +115,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
 
     ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
-    this.clusterResourceManager = factory.getClusterResourceManager(this, state);
+    this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
 
     if (this.hostAffinityEnabled) {
@@ -189,6 +196,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     containerAllocator.stop();
     try {
       allocatorThread.join();
+      log.info("Stopped container allocator");
     } catch (InterruptedException ie) {
       log.error("Allocator Thread join() threw an interrupted exception", ie);
       Thread.currentThread().interrupt();
@@ -197,19 +205,17 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     if (metrics != null) {
       try {
         metrics.stop();
+        log.info("Stopped metrics reporters");
       } catch (Throwable e) {
         log.error("Exception while stopping metrics {}", e);
       }
-      log.info("Stopped metrics reporters");
     }
 
-    if (clusterResourceManager != null) {
-      try {
-        clusterResourceManager.stop(state.status);
-      } catch (Throwable e) {
-        log.error("Exception while stopping cluster resource manager {}", e);
-      }
+    try {
+      clusterResourceManager.stop(state.status);
       log.info("Stopped cluster resource manager");
+    } catch (Throwable e) {
+      log.error("Exception while stopping cluster resource manager {}", e);
     }
 
     log.info("Finished stop of Container process manager");

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 653fb4e..adc6e51 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -42,6 +42,9 @@ public class SamzaApplicationState {
 
   public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED }
 
+  /**
+   * {@link JobModelManager} object associated with this {@link SamzaApplicationState}
+   */
   public final JobModelManager jobModelManager;
 
   /**
@@ -102,9 +105,9 @@ public class SamzaApplicationState {
   public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
 
   /**
-   * Final status of the application
+   * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads.
    */
-  public SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
+  public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
 
   /**
    * State indicating whether the job is healthy or not

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index b35cbff..16e8221 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -18,8 +18,10 @@
  */
 package org.apache.samza.coordinator;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -28,7 +30,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -48,10 +50,10 @@ public class StreamPartitionCountMonitor {
 
   private final Set<SystemStream> streamsToMonitor;
   private final StreamMetadataCache metadataCache;
-  private final MetricsRegistryMap metrics;
   private final int monitorPeriodMs;
   private final Map<SystemStream, Gauge<Integer>> gauges;
   private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
+  private final Callback callbackMethod;
 
   // Used to guard write access to state.
   private final Object lock = new Object();
@@ -61,6 +63,19 @@ public class StreamPartitionCountMonitor {
 
   private volatile State state = State.INIT;
 
+  /**
+   * A callback that is invoked when the {@link StreamPartitionCountMonitor} detects a change in the partition count of
+   * any of its {@link SystemStream}s.
+   */
+  public interface Callback {
+    /**
+     * Method to be called when SSP changes detected in the input
+     *
+     * @param streamsChanged the set of {@link SystemStream}s that have partition count changes
+     */
+    void onSystemStreamPartitionChange(Set<SystemStream> streamsChanged);
+  }
+
 
   /**
    * Gets the metadata for all the specified system streams from the provided metadata cache.
@@ -88,14 +103,15 @@ public class StreamPartitionCountMonitor {
    * @param metadataCache     the metadata cache which will be used to fetch metadata for partition counts.
    * @param metrics           the metrics registry to which the metrics should be added.
    * @param monitorPeriodMs   the period at which the monitor will run in milliseconds.
+   * @param monitorCallback   the callback method to be invoked when partition count changes are detected
    */
   public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
-      MetricsRegistryMap metrics, int monitorPeriodMs) {
+      MetricsRegistry metrics, int monitorPeriodMs, Callback monitorCallback) {
     this.streamsToMonitor = streamsToMonitor;
     this.metadataCache = metadataCache;
-    this.metrics = metrics;
     this.monitorPeriodMs = monitorPeriodMs;
     this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
+    this.callbackMethod = monitorCallback;
 
     // Pre-populate the gauges
     Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
@@ -109,48 +125,20 @@ public class StreamPartitionCountMonitor {
   }
 
   /**
-   * Fetches the current partition count for each system stream from the cache, compares the current count to the
-   * original count and updates the metric for that system stream with the delta.
-   */
-  void updatePartitionCountMetric() {
-    try {
-      Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
-
-      for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
-        SystemStream systemStream = metadataEntry.getKey();
-        SystemStreamMetadata metadata = metadataEntry.getValue();
-
-        int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size();
-        int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size();
-
-        Gauge gauge = gauges.get(systemStream);
-        gauge.set(currentPartitionCount - prevPartitionCount);
-      }
-    } catch (Exception e) {
-      log.error("Exception while updating partition count metric.", e);
-    }
-  }
-
-  /**
-   * For testing. Returns the metrics.
-   */
-  Map<SystemStream, Gauge<Integer>> getGauges() {
-    return gauges;
-  }
-
-  /**
    * Starts the monitor.
    */
   public void start() {
     synchronized (lock) {
       switch (state) {
         case INIT:
-          schedulerService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-              updatePartitionCountMetric();
-            }
-          }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+          if (monitorPeriodMs > 0) {
+            schedulerService.scheduleAtFixedRate(new Runnable() {
+              @Override
+              public void run() {
+                updatePartitionCountMetric();
+              }
+            }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+          }
 
           state = State.RUNNING;
           break;
@@ -179,8 +167,53 @@ public class StreamPartitionCountMonitor {
   }
 
   /**
-   * For testing.
+   * Fetches the current partition count for each system stream from the cache, compares the current count to the
+   * original count and updates the metric for that system stream with the delta.
+   */
+  @VisibleForTesting
+  public void updatePartitionCountMetric() {
+    try {
+      Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
+      Set<SystemStream> streamsChanged = new HashSet<>();
+
+      for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+        try {
+          SystemStream systemStream = metadataEntry.getKey();
+          SystemStreamMetadata metadata = metadataEntry.getValue();
+
+          int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().size();
+          int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().size();
+
+          Gauge gauge = gauges.get(systemStream);
+          gauge.set(currentPartitionCount - prevPartitionCount);
+          if (currentPartitionCount != prevPartitionCount) {
+            log.warn(String.format("Change of partition count detected in stream %s. old partition count: %d, current partition count: %d",
+                systemStream.toString(), prevPartitionCount, currentPartitionCount));
+            streamsChanged.add(systemStream);
+          }
+        } catch (Exception e) {
+          log.error(String.format("Error comparing partition count differences for stream: %s", metadataEntry.getKey().toString()));
+        }
+      }
+
+      if (!streamsChanged.isEmpty() && this.callbackMethod != null) {
+        this.callbackMethod.onSystemStreamPartitionChange(streamsChanged);
+      }
+
+    } catch (Exception e) {
+      log.error("Exception while updating partition count metric.", e);
+    }
+  }
+
+  /**
+   * For testing. Returns the metrics.
    */
+  @VisibleForTesting
+  Map<SystemStream, Gauge<Integer>> getGauges() {
+    return gauges;
+  }
+
+  @VisibleForTesting
   boolean isRunning() {
     return state == State.RUNNING;
   }
@@ -191,6 +224,7 @@ public class StreamPartitionCountMonitor {
    * <p>
    * This is currently exposed at the package private level for tests only.
    */
+  @VisibleForTesting
   boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
     return schedulerService.awaitTermination(timeout, unit);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1b3b893..083dbaf 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -137,8 +137,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
-  def getMonitorPartitionChange = getBoolean(JobConfig.MONITOR_PARTITION_CHANGE, false)
-
   def getMonitorPartitionChangeFrequency = getInt(
     JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
     JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index 0e3d568..ad03e59 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -98,4 +98,12 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
       .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
       .contains(systemName)
   }
+
+  /**
+    * Helper method to check if there is any stores configured w/ a changelog
+    */
+  def hasDurableStores : Boolean = {
+    val conf = config.subset("stores.", true)
+    conf.asScala.keys.exists(k => k.endsWith(".changelog"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 92c3663..e915a8a 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -43,17 +43,10 @@ import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.ExtendedSystemAdmin
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionMatcher
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.StreamSpec
+import org.apache.samza.system._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
+import org.apache.samza.{Partition, PartitionChangeException, SamzaException}
 
 import scala.collection.JavaConverters._
 
@@ -110,22 +103,8 @@ object JobModelManager extends Logging {
     val systemAdmins = getSystemAdmins(config)
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
-    var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
-    if (config.getMonitorPartitionChange) {
-      val extendedSystemAdmins = systemAdmins.filter{
-        case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
-      }
-      val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem))
-      if (inputStreamsToMonitor.nonEmpty) {
-        streamPartitionCountMonitor = new StreamPartitionCountMonitor(
-          inputStreamsToMonitor.asJava,
-          streamMetadataCache,
-          metricsRegistryMap,
-          config.getMonitorPartitionChangeFrequency)
-      }
-    }
     val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
-    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
+    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, null)
     val jobModel = jobModelManager.jobModel
     // Save the changelog mapping back to the ChangelogPartitionmanager
     // newChangelogPartitionMapping is the merging of all current task:changelog
@@ -145,6 +124,13 @@ object JobModelManager extends Logging {
 
     jobModelManager
   }
+
+  /**
+    * This method creates a {@link JobModelManager} object w/o {@link StreamPartitionCountMonitor}
+    *
+    * @param coordinatorSystemConfig configuration for coordinator system
+    * @return a JobModelManager object
+    */
   def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())
 
   /**
@@ -154,14 +140,13 @@ object JobModelManager extends Logging {
                                 changeLogMapping: util.Map[TaskName, Integer],
                                 localityManager: LocalityManager,
                                 streamMetadataCache: StreamMetadataCache,
-                                streamPartitionCountMonitor: StreamPartitionCountMonitor,
                                 containerIds: java.util.List[String]) = {
     val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
     jobModelRef.set(jobModel)
 
     val server = new HttpServer
     server.addServlet("/", new JobServlet(jobModelRef))
-    currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor)
+    currentJobModelManager = new JobModelManager(jobModel, server)
     currentJobModelManager
   }
 
@@ -337,7 +322,6 @@ object JobModelManager extends Logging {
   }
 
   private def getSystemNames(config: Config) = config.getSystemNames.toSet
-
 }
 
 /**
@@ -361,8 +345,7 @@ class JobModelManager(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null,
-  val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging {
+  val server: HttpServer = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -370,10 +353,6 @@ class JobModelManager(
     if (server != null) {
       debug("Starting HTTP server.")
       server.start
-      if (streamPartitionCountMonitor != null) {
-        debug("Starting Stream Partition Count Monitor..")
-        streamPartitionCountMonitor.start()
-      }
       info("Started HTTP server: %s" format server.getUrl)
     }
   }
@@ -381,10 +360,6 @@ class JobModelManager(
   def stop {
     if (server != null) {
       debug("Stopping HTTP server.")
-      if (streamPartitionCountMonitor != null) {
-        debug("Stopping Stream Partition Count Monitor..")
-        streamPartitionCountMonitor.stop()
-      }
       server.stop
       info("Stopped HTTP server.")
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 46bdb75..cc2a097 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -218,7 +218,6 @@ object Util extends Logging {
         JobConfig.JOB_NAME -> jobName,
         JobConfig.JOB_ID -> jobId,
         JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
-        JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange),
         JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
     new MapConfig(map.asJava)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
new file mode 100644
index 0000000..3a464c2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+/**
+ * Mock {@link ResourceManagerFactory} used in unit tests
+ */
+public class MockClusterResourceManagerFactory implements ResourceManagerFactory {
+
+  @Override
+  public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback,
+      SamzaApplicationState state) {
+    return new MockClusterResourceManager(callback);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
index db70c38..3987288 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.clustermanager;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
new file mode 100644
index 0000000..264966d
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.MockSystemFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Tests for {@link ClusterBasedJobCoordinator}
+ */
+public class TestClusterBasedJobCoordinator {
+
+  Map<String, String> configMap;
+
+  @Before
+  public void setUp() throws NoSuchFieldException, NoSuchMethodException {
+    configMap = new HashMap<>();
+    configMap.put("job.name", "test-job");
+    configMap.put("job.coordinator.system", "kafka");
+    configMap.put("task.inputs", "kafka.topic1");
+    configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
+    configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+    configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
+
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
+  }
+
+  @After
+  public void tearDown() {
+    MockSystemFactory.MSG_QUEUES.clear();
+  }
+
+  @Test
+  public void testPartitionCountMonitorWithDurableStates()
+      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+    configMap.put("stores.mystore.changelog", "mychangelog");
+    Config config = new MapConfig(configMap);
+
+    // mimic job runner code to write the config to coordinator stream
+    CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
+    CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+    producer.writeConfig("test-job", config);
+
+    ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
+
+    // change the input system stream metadata
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
+
+    StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
+    monitor.updatePartitionCountMetric();
+    assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED);
+  }
+
+  @Test
+  public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
+    Config config = new MapConfig(configMap);
+
+    // mimic job runner code to write the config to coordinator stream
+    CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
+    CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+    producer.writeConfig("test-job", config);
+
+    ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
+
+    // change the input system stream metadata
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
+
+    StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
+    monitor.updatePartitionCountMetric();
+    assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 1e9d372..734043a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -73,16 +73,16 @@ public class TestContainerAllocator {
   private static Config getConfig() {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
-        put("yarn.container.count", "1");
-        put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-        put("yarn.container.memory.mb", "512");
+        put("cluster-manager.container.count", "1");
+        put("cluster-manager.container.retry.count", "1");
+        put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.allocator.sleep.ms", "10");
+        put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
         put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
         put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("yarn.container.retry.count", "1");
-        put("yarn.container.retry.window.ms", "1999999999");
-        put("yarn.allocator.sleep.ms", "10");
       }
     });
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 4288aea..e252b7d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -54,17 +54,17 @@ public class TestContainerProcessManager {
 
   private Map<String, String> configVals = new HashMap<String, String>()  {
     {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
+      put("cluster-manager.container.count", "1");
+      put("cluster-manager.container.retry.count", "1");
+      put("cluster-manager.container.retry.window.ms", "1999999999");
+      put("cluster-manager.allocator.sleep.ms", "1");
+      put("cluster-manager.container.request.timeout.ms", "2");
+      put("cluster-manager.container.memory.mb", "512");
       put("yarn.package.path", "/foo");
       put("task.inputs", "test-system.test-stream");
+      put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
       put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
       put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.allocator.sleep.ms", "1");
-      put("yarn.container.request.timeout.ms", "2");
     }
   };
   private Config config = new MapConfig(configVals);
@@ -117,8 +117,8 @@ public class TestContainerProcessManager {
   public void testContainerProcessManager() throws Exception {
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfig());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
+    conf.put("cluster-manager.container.memory.mb", "500");
+    conf.put("cluster-manager.container.cpu.cores", "5");
 
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     ContainerProcessManager taskManager = new ContainerProcessManager(
@@ -137,8 +137,8 @@ public class TestContainerProcessManager {
 
     conf.clear();
     conf.putAll(getConfigWithHostAffinity());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
+    conf.put("cluster-manager.container.memory.mb", "500");
+    conf.put("cluster-manager.container.cpu.cores", "5");
 
     state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
     taskManager = new ContainerProcessManager(

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 32ec2d2..00198e9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -328,18 +328,18 @@ public class TestHostAwareContainerAllocator {
   private static Config getConfig() {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
-        put("yarn.container.count", "1");
-        put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-        put("yarn.container.memory.mb", "512");
+        put("cluster-manager.container.count", "1");
+        put("cluster-manager.container.retry.count", "1");
+        put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.container.request.timeout.ms", "3");
+        put("cluster-manager.allocator.sleep.ms", "1");
+        put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
         put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
         put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("yarn.container.retry.count", "1");
-        put("yarn.container.retry.window.ms", "1999999999");
-        put("yarn.samza.host-affinity.enabled", "true");
-        put("yarn.container.request.timeout.ms", "3");
-        put("yarn.allocator.sleep.ms", "1");
+        put("job.host-affinity.enabled", "true");
       }
     });
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index b7514c4..6a69889 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -49,7 +49,7 @@ public class JobModelManagerTestUtil {
       containers.put(String.valueOf(i), container);
     }
     JobModel jobModel = new JobModel(config, containers, localityManager);
-    return new JobModelManager(jobModel, server, null);
+    return new JobModelManager(jobModel, server);
   }
 
   public static JobModelManager getJobModelManagerUsingReadModel(Config config, int containerCount, StreamMetadataCache streamMetadataCache,
@@ -59,7 +59,7 @@ public class JobModelManagerTestUtil {
       containerIds.add(String.valueOf(i));
     }
     JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), locManager, streamMetadataCache, containerIds);
-    return new JobModelManager(jobModel, server, null);
+    return new JobModelManager(jobModel, server);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 1d6fc65..3130ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -85,16 +85,16 @@ public class TestJobModelManager {
   public void testLocalityMapWithHostAffinity() {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
-        put("yarn.container.count", "1");
-        put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-        put("yarn.container.memory.mb", "512");
+        put("cluster-manager.container.count", "1");
+        put("cluster-manager.container.memory.mb", "512");
+        put("cluster-manager.container.retry.count", "1");
+        put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.allocator.sleep.ms", "10");
         put("yarn.package.path", "/foo");
         put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
         put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("yarn.container.retry.count", "1");
-        put("yarn.container.retry.window.ms", "1999999999");
-        put("yarn.allocator.sleep.ms", "10");
         put("job.host-affinity.enabled", "true");
       }
     });
@@ -111,16 +111,16 @@ public class TestJobModelManager {
   public void testLocalityMapWithoutHostAffinity() {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
-        put("yarn.container.count", "1");
-        put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-        put("yarn.container.memory.mb", "512");
+        put("cluster-manager.container.count", "1");
+        put("cluster-manager.container.memory.mb", "512");
+        put("cluster-manager.container.retry.count", "1");
+        put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.allocator.sleep.ms", "10");
         put("yarn.package.path", "/foo");
         put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
         put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("yarn.container.retry.count", "1");
-        put("yarn.container.retry.window.ms", "1999999999");
-        put("yarn.allocator.sleep.ms", "10");
         put("job.host-affinity.enabled", "false");
       }
     });

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
deleted file mode 100644
index 07c4a24..0000000
--- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStreamPartition;
-
-public class MockSystemConsumer implements SystemConsumer {
-  public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
-  private boolean flag = true; // flag to make sure the messages only are
-                               // returned once
-
-  @Override
-  public void start() {}
-
-  @Override
-  public void stop() {}
-
-  @Override
-  public void register(SystemStreamPartition systemStreamPartition, String offset) {}
-
-  @Override
-  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
-    if (flag) {
-      ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
-      list.add(TestStorageRecovery.msg);
-      messages.put(TestStorageRecovery.ssp, list);
-      flag = false;
-      return messages; 
-    } else {
-      messages.clear();
-      return messages;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
deleted file mode 100644
index 7abf82b..0000000
--- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
-
-public class MockSystemFactory implements SystemFactory {
-
-  @Override
-  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
-    return new MockSystemConsumer();
-  }
-
-  @Override
-  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
-    return null;
-  }
-
-  @Override
-  public SystemAdmin getAdmin(String systemName, Config config) {
-    return TestStorageRecovery.systemAdmin;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index 21d0150..7c1647e 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -19,34 +19,25 @@
 
 package org.apache.samza.storage;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.MockSystemFactory;
 import org.apache.samza.system.SystemStreamPartition;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestStorageRecovery {
 
-  public static SystemAdmin systemAdmin = null;
   public Config config = null;
-  public SystemStreamMetadata systemStreamMetadata = null;
-  public SystemStreamMetadata inputSystemStreamMetadata = null;
   private static final String SYSTEM_STREAM_NAME = "changelog";
   private static final String INPUT_STREAM = "input";
   private static final String STORE_NAME = "testStore";
@@ -57,16 +48,6 @@ public class TestStorageRecovery {
   public void setup() throws InterruptedException {
     putConfig();
     putMetadata();
-
-    systemAdmin = mock(SystemAdmin.class);
-
-    Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME));
-    Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM));
-    HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<>();
-    ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata);
-    ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata);
-    when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap);
-    when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
   }
 
   @After
@@ -106,15 +87,9 @@ public class TestStorageRecovery {
   }
 
   private void putMetadata() {
-    SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2");
-    HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>();
-    map.put(new Partition(0), sspm);
-    map.put(new Partition(1), sspm);
-    systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map);
-
-    HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>();
-    map1.put(new Partition(0), sspm);
-    map1.put(new Partition(1), sspm);
-    inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1);
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } });
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(1)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } });
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(0)), new ArrayList<>());
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(1)), new ArrayList<>());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
new file mode 100644
index 0000000..a9c57da
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * A mock system backed by a set of in-memory queues. Used for testing w/o actual external messaging systems.
+ */
+public class MockSystemFactory implements SystemFactory {
+
+  public static final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> MSG_QUEUES = new ConcurrentHashMap<>();
+
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    return new SystemConsumer() {
+      public void start() {
+      }
+
+      public void stop() {
+      }
+
+      public void register(SystemStreamPartition systemStreamPartition, String offset) {
+        MSG_QUEUES.putIfAbsent(systemStreamPartition, new ArrayList<>());
+      }
+
+      public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+        Map<SystemStreamPartition, List<IncomingMessageEnvelope>> retQueues = new HashMap<>();
+        systemStreamPartitions.forEach(ssp -> {
+            List<IncomingMessageEnvelope> msgs = MSG_QUEUES.get(ssp);
+            if (msgs == null) {
+              retQueues.put(ssp, new ArrayList<>());
+            } else {
+              retQueues.put(ssp, MSG_QUEUES.remove(ssp));
+            }
+          });
+        return retQueues;
+      }
+    };
+  }
+
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new SystemProducer() {
+      private final Random seed = new Random(System.currentTimeMillis());
+
+      @Override
+      public void start() {
+
+      }
+
+      @Override
+      public void stop() {
+
+      }
+
+      @Override
+      public void register(String source) {
+      }
+
+      @Override
+      public void send(String source, OutgoingMessageEnvelope envelope) {
+        SystemStream systemStream = envelope.getSystemStream();
+        List<SystemStreamPartition> sspForSystem = MSG_QUEUES.keySet().stream()
+            .filter(ssp -> ssp.getSystemStream().equals(systemStream))
+            .collect(ArrayList::new, (l, ssp) -> l.add(ssp), (l1, l2) -> l1.addAll(l2));
+        if (sspForSystem.isEmpty()) {
+          MSG_QUEUES.putIfAbsent(new SystemStreamPartition(systemStream, new Partition(0)), new ArrayList<>());
+          sspForSystem.add(new SystemStreamPartition(systemStream, new Partition(0)));
+        }
+        int partitionCount = sspForSystem.size();
+        int partitionId = envelope.getPartitionKey() == null ?
+            envelope.getKey() == null ? this.seed.nextInt(partitionCount) : envelope.getKey().hashCode() % partitionCount :
+            envelope.getPartitionKey().hashCode() % partitionCount;
+        SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partitionId));
+        List<IncomingMessageEnvelope> msgQueue = MSG_QUEUES.get(ssp);
+        msgQueue.add(new IncomingMessageEnvelope(ssp, null, envelope.getKey(), envelope.getMessage()));
+      }
+
+      @Override
+      public void flush(String source) {
+
+      }
+    };
+  }
+
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new ExtendedSystemAdmin() {
+
+      @Override
+      public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+        return null;
+      }
+
+      @Override
+      public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+        Map<String, SystemStreamMetadata> metadataMap = new HashMap<>();
+        Map<String, Set<Partition>> partitionMap = MSG_QUEUES.entrySet()
+            .stream()
+            .filter(entry -> streamNames.contains(entry.getKey().getSystemStream().getStream()))
+            .map(e -> e.getKey()).<Map<String, Set<Partition>>>collect(HashMap::new, (m, ssp) -> {
+                if (m.get(ssp.getStream()) == null) {
+                  m.put(ssp.getStream(), new HashSet<>());
+                }
+                m.get(ssp.getStream()).add(ssp.getPartition());
+              }, (m1, m2) -> {
+                m2.forEach((k, v) -> {
+                    if (m1.get(k) == null) {
+                      m1.put(k, v);
+                    } else {
+                      m1.get(k).addAll(v);
+                    }
+                  });
+              });
+
+        partitionMap.forEach((k, v) -> {
+            Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaMap =
+                v.stream().<Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>>collect(HashMap::new,
+                  (m, p) -> {
+                    m.put(p, new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+                  }, (m1, m2) -> m1.putAll(m2));
+
+            metadataMap.put(k, new SystemStreamMetadata(k, partitionMetaMap));
+          });
+
+        return metadataMap;
+      }
+
+      @Override
+      public Integer offsetComparator(String offset1, String offset2) {
+        return null;
+      }
+
+      @Override
+      public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
+        return getSystemStreamMetadata(streamNames);
+      }
+
+      @Override
+      public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
+        return null;
+      }
+
+      @Override
+      public boolean createStream(StreamSpec streamSpec) {
+        return true;
+      }
+
+      @Override
+      public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
+
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index e6b148b..f092d75 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -26,8 +26,7 @@ import org.apache.samza.job.local.ProcessJobFactory
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.util.Util
-import org.junit.After
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -36,10 +35,7 @@ import org.apache.samza.config.TaskConfig
 import org.apache.samza.config.SystemConfig
 import org.apache.samza.container.SamzaContainer
 import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.Config
 import org.apache.samza.system._
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.job.model.JobModel
@@ -126,12 +122,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
     assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
 
-    // Check the status of Stream Partition Count Monitor
-    assertNotNull(coordinator.streamPartitionCountMonitor)
-    assertTrue(coordinator.streamPartitionCountMonitor.isRunning())
-
     coordinator.stop
-    assertFalse(coordinator.streamPartitionCountMonitor.isRunning())
   }
 
   @Test
@@ -288,54 +279,16 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     }}.toMap
   }
 
+  @Before
+  def setUp() {
+    // setup the test stream metadata
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]());
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]());
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]());
+  }
 
   @After
   def tearDown() = {
     MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
   }
-}
-
-class MockSystemFactory extends SystemFactory {
-  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = new SystemConsumer {
-    def start() {}
-    def stop() {}
-    def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
-    def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = new java.util.HashMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
-  }
-  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null
-  def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
-}
-
-class MockSystemAdmin extends ExtendedSystemAdmin {
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
-  def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = {
-    assertEquals(1, streamNames.size)
-    val partitionMetadata = Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, null),
-      new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null),
-      // Create a new Partition(2), which wasn't in the prior changelog mapping.
-      new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null))
-    Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava
-  }
-
-  override def offsetComparator(offset1: String, offset2: String) = null
-
-  override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
-                                              cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
-    assertEquals(1, streamNames.size())
-    val result = streamNames.asScala.map {
-      stream =>
-        val partitionMetadata = Map(
-          new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
-          new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
-          new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
-        )
-        stream -> new SystemStreamMetadata(stream, partitionMetadata.asJava)
-    }.toMap
-    result.asJava
-  }
-
-  override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index 7d15fd8..c7eab3b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
 class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
 
   @Test
-  def testStreamPartitionCountMonitor(): Unit = {
+  def testStreamPartitionCountChange(): Unit = {
     val mockMetadataCache = mock[StreamMetadataCache]
     val inputSystemStream = new SystemStream("test-system", "test-stream")
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
@@ -70,11 +70,79 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
 
     val metrics = new MetricsRegistryMap()
 
+    val mockCallback = mock[StreamPartitionCountMonitor.Callback]
+
+    val partitionCountMonitor = new StreamPartitionCountMonitor(
+      inputSystemStreamSet.asJava,
+      mockMetadataCache,
+      metrics,
+      5,
+      mockCallback
+    )
+
+    partitionCountMonitor.updatePartitionCountMetric()
+
+    assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
+    assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
+
+    assertNotNull(metrics.getGroup("job-coordinator"))
+
+    val metricGroup = metrics.getGroup("job-coordinator")
+    assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
+    assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+
+    verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())
+
+  }
+
+  @Test
+  def testStreamPartitionCountException(): Unit = {
+    val mockMetadataCache = mock[StreamMetadataCache]
+    val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputExceptionStream = new SystemStream("test-system", "test-exception-stream")
+    val inputSystemStreamSet = Set[SystemStream](inputSystemStream, inputExceptionStream)
+
+    val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+        put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+      }
+    }
+
+    val finalPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        putAll(initialPartitionMetadata)
+        put(new Partition(2), new SystemStreamPartitionMetadata("", "", ""))
+      }
+    }
+    val streamMockMetadata = mock[java.util.HashMap[Partition, SystemStreamPartitionMetadata]]
+
+    val initialMetadata = Map(
+      inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, initialPartitionMetadata),
+      inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+    )
+    val finalMetadata = Map(
+      inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, streamMockMetadata),
+      inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, finalPartitionMetadata)
+    )
+
+    when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
+      .thenReturn(initialMetadata)  // Called during StreamPartitionCountMonitor instantiation
+      .thenReturn(finalMetadata)  // Called from monitor thread the second time
+
+    // make the call to get stream metadata for {@code inputExceptionStream} fail w/ a runtime exception
+    when(streamMockMetadata.keySet()).thenThrow(new RuntimeException)
+
+    val metrics = new MetricsRegistryMap()
+
+    val mockCallback = mock[StreamPartitionCountMonitor.Callback]
+
     val partitionCountMonitor = new StreamPartitionCountMonitor(
       inputSystemStreamSet.asJava,
       mockMetadataCache,
       metrics,
-      5
+      5,
+      mockCallback
     )
 
     partitionCountMonitor.updatePartitionCountMetric()
@@ -87,6 +155,10 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val metricGroup = metrics.getGroup("job-coordinator")
     assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
     assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+
+    // Make sure as long as one of the input stream topic partition change is detected, the callback is invoked
+    verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())
+
   }
 
   @Test
@@ -98,7 +170,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
       inputSystemStreamSet.asJava,
       mockMetadataCache,
       new MetricsRegistryMap(),
-      50
+      50,
+      null
     )
 
     assertFalse(monitor.isRunning())
@@ -143,7 +216,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
       inputSystemStreamSet.asJava,
       mockMetadataCache,
       new MetricsRegistryMap(),
-      50
+      50,
+      null
     ) {
       override def updatePartitionCountMetric(): Unit = {
         sampleCount.countDown()

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
index a6d82e1..3d385d6 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
@@ -20,7 +20,6 @@ package org.apache.samza.system
 
 import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.config._
-import org.apache.samza.coordinator.MockSystemFactory
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
index b7f2119..255c85e 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
@@ -20,7 +20,6 @@ package org.apache.samza.system
 
 import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.config._
-import org.apache.samza.coordinator.MockSystemFactory
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._