You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/23 06:07:50 UTC

[samza] branch master updated: SAMZA-2260: Standalone - coordinator stream metadata store lifecycle (#1090)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 106286f  SAMZA-2260: Standalone - coordinator stream metadata store lifecycle (#1090)
106286f is described below

commit 106286fc2161bc093f85bf785d56530467ceff45
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Mon Jul 22 23:07:44 2019 -0700

    SAMZA-2260: Standalone - coordinator stream metadata store lifecycle (#1090)
    
    * SAMZA-2260: Standalone and coordinator metadata store lifecycle
    
    * Trigger build again
    
    * Address PR comments from @shanthoosh
---
 .../coordinator/AzureJobCoordinatorFactory.java    |   4 +-
 .../samza/coordinator/JobCoordinatorFactory.java   |   5 +-
 .../apache/samza/processor/StreamProcessor.java    |  74 ++++++++++++---
 .../samza/runtime/LocalApplicationRunner.java      |  78 +++++++++++----
 .../PassthroughJobCoordinatorFactory.java          |   4 +-
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  71 ++++++--------
 .../apache/samza/zk/ZkJobCoordinatorFactory.java   |  17 ++--
 .../samza/processor/TestStreamProcessor.java       |  35 ++++---
 .../samza/runtime/TestLocalApplicationRunner.java  | 105 ++++++++++++++++++---
 .../org/apache/samza/zk/TestZkJobCoordinator.java  |  38 +++++---
 .../sql/runner/SamzaSqlApplicationRunner.java      |   4 +-
 .../apache/samza/sql/util/SamzaSqlTestConfig.java  |   6 +-
 .../apache/samza/test/framework/TestRunner.java    |   2 +-
 .../samza/processor/TestZkStreamProcessorBase.java |   5 +-
 .../samzasql/SamzaSqlIntegrationTestHarness.java   |  24 ++++-
 15 files changed, 341 insertions(+), 131 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
index ff8925a..206e0d4 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
@@ -20,11 +20,13 @@
 package org.apache.samza.coordinator;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 
 public class AzureJobCoordinatorFactory implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+      MetadataStore metadataStore) {
     return new AzureJobCoordinator(processorId, config, metricsRegistry);
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 8f3d96e..b555bae 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.samza.coordinator;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 
 @InterfaceStability.Evolving
@@ -29,7 +30,9 @@ public interface JobCoordinatorFactory {
    * @param processorId a unique logical identifier assigned to the {@link org.apache.samza.processor.StreamProcessor}.
    * @param config the configuration of the samza application.
    * @param metricsRegistry  used to publish the coordination specific metrics.
+   * @param metadataStore used to read and write metadata for the samza application.
    * @return the {@link JobCoordinator} instance.
    */
-  JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry);
+  JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+      MetadataStore metadataStore);
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 9877a62..815725e 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -51,10 +51,13 @@ import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.ReflectionUtil;
@@ -133,6 +136,7 @@ public class StreamProcessor {
   private final ExecutorService containerExcecutorService;
   private final Object lock = new Object();
   private final MetricsRegistryMap metricsRegistry;
+  private final MetadataStore metadataStore;
 
   private volatile Throwable containerException = null;
 
@@ -181,9 +185,8 @@ public class StreamProcessor {
    * @param customMetricsReporters registered with the metrics system to report metrics.
    * @param taskFactory the task factory to instantiate the Task.
    * @param processorListener listener to the StreamProcessor life cycle.
-   *
-   * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
-   * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+   * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
    */
   @Deprecated
   public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
@@ -193,7 +196,7 @@ public class StreamProcessor {
 
   /**
    * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
-   * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)}, with the following differences:
    * <ol>
    *   <li>Passes null for application-defined context factories</li>
    *   <li>Accepts a {@link ProcessorLifecycleListener} directly instead of a
@@ -206,15 +209,14 @@ public class StreamProcessor {
    * @param taskFactory task factory to instantiate the Task
    * @param processorListener listener to the StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
-   *
-   * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
-   * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+   * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
    */
   @Deprecated
   public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
     this(processorId, config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener,
-        jobCoordinator);
+        jobCoordinator, null);
   }
 
   /**
@@ -228,13 +230,44 @@ public class StreamProcessor {
    * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
    * @param externalContextOptional optional {@link ExternalContext} to pass through to the application
    * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
-   * @param jobCoordinator the instance of {@link JobCoordinator}
+   * @param jobCoordinator the instance of {@link JobCoordinator}. If null, the jobCoordinator instance will be created.
+   *                       If the jobCoordinator is passed in externally, the jobCoordinator and StreamProcessor may not
+   *                       share the same instance of the {@link MetadataStore}.
+   * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
    */
+  @Deprecated
   public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
       Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
       Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
       JobCoordinator jobCoordinator) {
+    this(processorId, config, customMetricsReporters, taskFactory, applicationDefinedContainerContextFactoryOptional,
+        applicationDefinedTaskContextFactoryOptional, externalContextOptional, listenerFactory,
+        jobCoordinator, null);
+  }
+
+  /**
+   * Builds a {@link StreamProcessor} with full specification of processing components.
+   *
+   * @param processorId a unique logical identifier assigned to the stream processor.
+   * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
+   * @param customMetricsReporters registered with the metrics system to report metrics
+   * @param taskFactory task factory to instantiate the Task
+   * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
+   * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
+   * @param externalContextOptional optional {@link ExternalContext} to pass through to the application
+   * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
+   * @param jobCoordinator the instance of {@link JobCoordinator}. If null, the jobCoordinator instance will be created.
+   *                       If the jobCoordinator is passed in externally, the jobCoordinator and StreamProcessor may not
+   *                       share the same instance of the {@link MetadataStore}.
+   * @param metadataStore the instance of {@link MetadataStore} used by managers such as {@link StartpointManager}
+   */
+  public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
+      Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
+      Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
+      Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
+      JobCoordinator jobCoordinator, MetadataStore metadataStore) {
     Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
     Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "ProcessorId cannot be null.");
     this.config = config;
@@ -249,7 +282,10 @@ public class StreamProcessor {
     this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
     this.externalContextOptional = externalContextOptional;
     this.taskShutdownMs = new TaskConfig(config).getShutdownMs();
-    this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
+    this.metadataStore = metadataStore;
+    this.jobCoordinator = (jobCoordinator != null)
+        ? jobCoordinator
+        : createJobCoordinator(config, processorId, metricsRegistry, metadataStore);
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
     ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
@@ -349,19 +385,28 @@ public class StreamProcessor {
       this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue());
     }
 
+    // Metadata store lifecycle managed outside of the SamzaContainer.
+    // All manager lifecycles are managed in the SamzaContainer including startpointManager
+    StartpointManager startpointManager = null;
+    if (metadataStore != null) {
+      startpointManager = new StartpointManager(metadataStore);
+    } else {
+      LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
+    }
+
     return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
         this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), getClass().getClassLoader(), null, null,
+        Option.apply(this.externalContextOptional.orElse(null)), getClass().getClassLoader(), null, startpointManager,
         diagnosticsManager);
   }
 
-  private JobCoordinator createJobCoordinator() {
+  private static JobCoordinator createJobCoordinator(Config config, String processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
     String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     JobCoordinatorFactory jobCoordinatorFactory =
-        ReflectionUtil.getObj(getClass().getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class);
-    return jobCoordinatorFactory.getJobCoordinator(processorId, config, metricsRegistry);
+        ReflectionUtil.getObj(StreamProcessor.class.getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class);
+    return jobCoordinatorFactory.getJobCoordinator(processorId, config, metricsRegistry, metadataStore);
   }
 
   /**
@@ -453,7 +498,6 @@ public class StreamProcessor {
           processorListener.afterFailure(containerException);
         else
           processorListener.afterStop();
-
       }
 
       @Override
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index adbb4c6..5549570 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -21,7 +21,6 @@ package org.apache.samza.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,12 +32,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
-import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
@@ -48,6 +49,7 @@ import org.apache.samza.context.ExternalContext;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.RunIdGenerator;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -75,12 +77,13 @@ public class LocalApplicationRunner implements ApplicationRunner {
   public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
 
   private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
-  private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
+  private final Set<Pair<StreamProcessor, MetadataStore>> processors = ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
   private final boolean isAppModeBatch;
   private final Optional<CoordinationUtils> coordinationUtils;
+  private final MetadataStoreFactory metadataStoreFactory;
   private Optional<String> runId = Optional.empty();
   private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
 
@@ -93,9 +96,21 @@ public class LocalApplicationRunner implements ApplicationRunner {
    * @param config configuration for the application
    */
   public LocalApplicationRunner(SamzaApplication app, Config config) {
+    this(app, config, new CoordinatorStreamMetadataStoreFactory());
+  }
+
+  /**
+   * Constructors a {@link LocalApplicationRunner} to run the {@code app} with the {@code config}.
+   *
+   * @param app application to run
+   * @param config configuration for the application
+   * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream.
+   */
+  public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) {
     this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-    isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
-    coordinationUtils = getCoordinationUtils(config, getClass().getClassLoader());
+    this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    this.coordinationUtils = getCoordinationUtils(config, getClass().getClassLoader());
+    this.metadataStoreFactory = metadataStoreFactory;
   }
 
   /**
@@ -104,8 +119,9 @@ public class LocalApplicationRunner implements ApplicationRunner {
   @VisibleForTesting
   LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
     this.appDesc = appDesc;
-    isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
     this.coordinationUtils = coordinationUtils;
+    this.metadataStoreFactory = new CoordinatorStreamMetadataStoreFactory();
   }
 
   private Optional<CoordinationUtils> getCoordinationUtils(Config config, ClassLoader classLoader) {
@@ -145,7 +161,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
     }
 
     try {
-      MetadataStore metadataStore = getMetadataStore();
+      MetadataStore metadataStore = getMetadataStoreForRunID();
       runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore));
       runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
     } catch (Exception e) {
@@ -172,14 +188,16 @@ public class LocalApplicationRunner implements ApplicationRunner {
       }
       jobConfigs.forEach(jobConfig -> {
           LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+          MetadataStore coordinatorStreamStore = createCoordinatorStreamStore(jobConfig);
+          coordinatorStreamStore.init();
           StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
-              sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext));
-          processors.add(processor);
+              sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext), coordinatorStreamStore);
+          processors.add(Pair.of(processor, coordinatorStreamStore));
         });
       numProcessorsToStart.set(processors.size());
 
       // start the StreamProcessors
-      processors.forEach(StreamProcessor::start);
+      processors.forEach(sp -> sp.getLeft().start());
     } catch (Throwable throwable) {
       cleanup();
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
@@ -191,7 +209,10 @@ public class LocalApplicationRunner implements ApplicationRunner {
 
   @Override
   public void kill() {
-    processors.forEach(StreamProcessor::stop);
+    processors.forEach(sp -> {
+        sp.getLeft().stop();    // Stop StreamProcessor
+        sp.getRight().close();  // Close associated coordinator metadata store
+      });
     cleanup();
   }
 
@@ -231,7 +252,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
 
   @VisibleForTesting
   protected Set<StreamProcessor> getProcessors() {
-    return Collections.unmodifiableSet(processors);
+    return processors.stream().map(sp -> sp.getLeft()).collect(Collectors.toSet());
   }
 
   @VisibleForTesting
@@ -240,16 +261,26 @@ public class LocalApplicationRunner implements ApplicationRunner {
   }
 
   @VisibleForTesting
+  MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+    MetadataStore coordinatorStreamStore =
+        metadataStoreFactory.getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
+    return coordinatorStreamStore;
+  }
+
+  @VisibleForTesting
   StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
-      Optional<ExternalContext> externalContextOptional) {
+      Optional<ExternalContext> externalContextOptional, MetadataStore coordinatorStreamStore) {
     TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
     Map<String, MetricsReporter> reporters = new HashMap<>();
     String processorId = createProcessorId(new ApplicationConfig(config), getClass().getClassLoader());
     appDesc.getMetricsReporterFactories().forEach((name, factory) ->
         reporters.put(name, factory.getMetricsReporter(name, processorId, config)));
-    return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
-        appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null);
+
+    StreamProcessor streamProcessor = new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+          appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null, coordinatorStreamStore);
+
+    return streamProcessor;
   }
 
   /**
@@ -282,7 +313,13 @@ public class LocalApplicationRunner implements ApplicationRunner {
     coordinationUtils.ifPresent(CoordinationUtils::close);
   }
 
-  private MetadataStore getMetadataStore() {
+  /**
+   * This is not to be confused with the metadata store created from the member {@link #metadataStoreFactory}.
+   * The reason for the two Metadata store types (ZK and coordinator stream) is that the job model needs to be stored in
+   * ZK because of the versioning requirements. Configs and startpoints are stored in the coordinator stream. This
+   * disparity will be resolved with the next gen metadata store abstraction.
+   */
+  private MetadataStore getMetadataStoreForRunID() {
     String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY);
     MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
     return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
@@ -316,7 +353,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
 
     @Override
     public void afterStop() {
-      processors.remove(processor);
+      processors.removeIf(pair -> pair.getLeft().equals(processor));
 
       // successful shutdown
       handleProcessorShutdown(null);
@@ -324,13 +361,16 @@ public class LocalApplicationRunner implements ApplicationRunner {
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.remove(processor);
+      processors.removeIf(pair -> pair.getLeft().equals(processor));
 
       // the processor stopped with failure, this is logging the first processor's failure as the cause of
       // the whole application failure
       if (failure.compareAndSet(null, t)) {
         // shutdown the other processors
-        processors.forEach(StreamProcessor::stop);
+        processors.forEach(sp -> {
+            sp.getLeft().stop();    // Stop StreamProcessor
+            sp.getRight().close();  // Close associated coordinator metadata store
+          });
       }
 
       // handle the current processor's shutdown failure.
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
index 5d5fecf..bacf312 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
@@ -21,11 +21,13 @@ package org.apache.samza.standalone;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 
 public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+      MetadataStore metadataStore) {
     return new PassthroughJobCoordinator(processorId, config, metricsRegistry);
   }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 167d458..b4a2481 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -60,11 +61,9 @@ import org.apache.samza.runtime.LocationIdProvider;
 import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -109,6 +108,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
   private final LocationId locationId;
   private final MetadataStore jobModelMetadataStore;
+  private final CoordinatorStreamStore coordinatorStreamStore;
 
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
@@ -124,11 +124,14 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   StreamPartitionCountMonitor streamPartitionCountMonitor = null;
 
-  ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore) {
+  ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore, MetadataStore coordinatorStreamStore) {
+    // TODO: When we consolidate metadata stores for standalone, this check can be removed. For now, we expect this type.
+    //   Keeping method signature as MetadataStore to avoid public API changes in the future
+    Preconditions.checkArgument(coordinatorStreamStore instanceof CoordinatorStreamStore);
+
     this.config = config;
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
     this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
-
     this.processorId = processorId;
     this.zkUtils = zkUtils;
     // setup a listener for a session state change
@@ -150,6 +153,7 @@ public class ZkJobCoordinator implements JobCoordinator {
             LocationIdProviderFactory.class);
     LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
     this.locationId = locationIdProvider.getLocationId();
+    this.coordinatorStreamStore = (CoordinatorStreamStore) coordinatorStreamStore;
     this.jobModelMetadataStore = jobModelMetadataStore;
   }
 
@@ -308,38 +312,33 @@ public class ZkJobCoordinator implements JobCoordinator {
    */
   @VisibleForTesting
   void loadMetadataResources(JobModel jobModel) {
-    CoordinatorStreamStore coordinatorStreamStore = null;
     try {
-      // Creates the coordinator stream if it does not exists.
-      coordinatorStreamStore = createCoordinatorStreamStore();
-      coordinatorStreamStore.init();
-
       MetadataResourceUtil metadataResourceUtil = createMetadataResourceUtil(jobModel, getClass().getClassLoader());
       metadataResourceUtil.createResources();
 
-      CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
-      NamespaceAwareCoordinatorStreamStore configStore =
-          new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
-      for (Map.Entry<String, String> entry : config.entrySet()) {
-        byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
-        configStore.put(entry.getKey(), serializedValue);
-      }
+      if (coordinatorStreamStore != null) {
+        // TODO: SAMZA-2273 - publish configs async
+        CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+        NamespaceAwareCoordinatorStreamStore configStore =
+            new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+          byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
+          configStore.put(entry.getKey(), serializedValue);
+        }
 
-      // fan out the startpoints
-      StartpointManager startpointManager = createStartpointManager(coordinatorStreamStore);
-      startpointManager.start();
-      try {
-        startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
-      } finally {
-        startpointManager.stop();
+        // fan out the startpoints
+        StartpointManager startpointManager = createStartpointManager();
+        startpointManager.start();
+        try {
+          startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+        } finally {
+          startpointManager.stop();
+        }
+      } else {
+        LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
       }
     } catch (IOException ex) {
       throw new SamzaException(String.format("IO exception while loading metadata resources."), ex);
-    } finally {
-      if (coordinatorStreamStore != null) {
-        LOG.info("Stopping the coordinator stream metadata store.");
-        coordinatorStreamStore.close();
-      }
     }
   }
 
@@ -349,17 +348,6 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   /**
-   * Creates a coordinator stream kafka topic.
-   */
-  @VisibleForTesting
-  CoordinatorStreamStore createCoordinatorStreamStore() {
-    SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
-    SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
-    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
-    return new CoordinatorStreamStore(config, metrics.getMetricsRegistry());
-  }
-
-  /**
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
   @VisibleForTesting
@@ -398,8 +386,9 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   @VisibleForTesting
-  StartpointManager createStartpointManager(CoordinatorStreamStore metadataStore) {
-    return new StartpointManager(metadataStore);
+  StartpointManager createStartpointManager() {
+    // This method is for easy mocking.
+    return new StartpointManager(coordinatorStreamStore);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 8aacb96..2ee116e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -26,6 +26,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,20 +38,18 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   private static final String DEFAULT_JOB_NAME = "defaultJob";
   private static final String PROTOCOL_VERSION = "2.0";
 
-  /**
-   * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
-   *
-   * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
-   * @return An instance of {@link ZkJobCoordinator}
-   */
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+      MetadataStore coordinatorStreamStore) {
     // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
     String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
     ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
     LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
-    ZkMetadataStore metadataStore = new ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
-    return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils, metadataStore);
+
+    // TODO: This should be merged with coordinatorStreamStore - SAMZA-2272
+    ZkMetadataStore zkMetadataStore = new ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
+
+    return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils, zkMetadataStore, coordinatorStreamStore);
   }
 
   private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) {
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index b809ead..6d78b77 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.processor;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.HashMap;
@@ -29,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.ImmutableMap;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -40,6 +40,7 @@ import org.apache.samza.container.SamzaContainerStatus;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.processor.StreamProcessor.State;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
@@ -87,7 +88,7 @@ public class TestStreamProcessor {
     processorListenerState.clear();
   }
 
-  class TestableStreamProcessor extends StreamProcessor {
+  static class TestableStreamProcessor extends StreamProcessor {
     private final CountDownLatch containerStop = new CountDownLatch(1);
     private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
     private SamzaContainer container = null;
@@ -110,7 +111,9 @@ public class TestStreamProcessor {
         JobCoordinator jobCoordinator,
         SamzaContainer container,
         Duration runLoopShutdownDuration) {
-      super("TEST_PROCESSOR_ID", config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator);
+
+      super("TEST_PROCESSOR_ID", config, customMetricsReporters, streamTaskFactory, Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener,
+          jobCoordinator, Mockito.mock(MetadataStore.class));
       this.container = container;
       this.runLoopShutdownDuration = runLoopShutdownDuration;
     }
@@ -151,6 +154,7 @@ public class TestStreamProcessor {
     when(mockJobModel.getContainers()).thenReturn(containers);
     return mockJobModel;
   }
+
   /**
    * Tests stop() method when Container AND JobCoordinator are running
    */
@@ -217,7 +221,7 @@ public class TestStreamProcessor {
       }).when(mockJobCoordinator).start();
 
     processor.start();
-    processorListenerStart.await();
+    processorListenerStart.await(10, TimeUnit.SECONDS);
 
     assertEquals(SamzaContainerStatus.STARTED, processor.getContainerStatus());
 
@@ -406,7 +410,9 @@ public class TestStreamProcessor {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     Mockito.doNothing().when(mockJobCoordinator).start();
     ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
-    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = Mockito.spy(new StreamProcessor("TestProcessorId", new MapConfig(),
+        new HashMap<>(), null, Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener,
+        mockJobCoordinator, Mockito.mock(MetadataStore.class)));
     assertEquals(State.NEW, streamProcessor.getState());
     streamProcessor.start();
 
@@ -425,7 +431,8 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+        Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
 
     /**
      * Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED,
@@ -493,7 +500,9 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+    StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor("TestProcessorId", config, new HashMap<>(),
+        null, Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator,
+        Mockito.mock(MetadataStore.class)));
 
     Mockito.doNothing().when(mockJobCoordinator).stop();
     Mockito.doNothing().when(mockSamzaContainer).shutdown();
@@ -515,7 +524,8 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+        Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
 
     Exception failureException = new Exception("dummy exception");
 
@@ -536,7 +546,8 @@ public class TestStreamProcessor {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+        Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
 
     streamProcessor.state = State.RUNNING;
     streamProcessor.jobCoordinatorListener.onCoordinatorStop();
@@ -549,10 +560,10 @@ public class TestStreamProcessor {
   public void testStreamProcessorWithStreamProcessorListenerFactory() {
     AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>();
     StreamProcessor streamProcessor =
-        new StreamProcessor("TestProcessorId", mock(Config.class), new HashMap<>(), mock(TaskFactory.class), Optional.empty(),
-            Optional.empty(), Optional.empty(),
+        new StreamProcessor("TestProcessorId", mock(Config.class), new HashMap<>(), mock(TaskFactory.class),
+            Optional.empty(), Optional.empty(), Optional.empty(),
             sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)),
-            mock(JobCoordinator.class));
+            mock(JobCoordinator.class), Mockito.mock(MetadataStore.class));
     assertEquals(streamProcessor, mockListener.get().processor);
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index ea5f8a0..a5fd760 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -25,12 +25,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.samza.application.descriptors.ApplicationDescriptor;
-import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
@@ -41,9 +41,10 @@ import org.apache.samza.coordinator.ClusterMembership;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLock;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.task.IdentityStreamTask;
 import org.apache.samza.zk.ZkMetadataStore;
 import org.apache.samza.zk.ZkMetadataStoreFactory;
@@ -60,8 +61,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 @RunWith(PowerMockRunner.class)
@@ -94,6 +104,7 @@ public class TestLocalApplicationRunner {
     prepareTest();
 
     StreamProcessor sp = mock(StreamProcessor.class);
+    CoordinatorStreamStore metadataStore = mock(CoordinatorStreamStore.class);
 
     ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
         ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -108,11 +119,16 @@ public class TestLocalApplicationRunner {
 
     ExternalContext externalContext = mock(ExternalContext.class);
     doReturn(sp).when(runner)
-        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(
+            CoordinatorStreamStore.class));
+    doReturn(metadataStore).when(runner).createCoordinatorStreamStore(any(Config.class));
     doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
 
     runner.run(externalContext);
 
+    verify(metadataStore).init();
+    verify(metadataStore, never()).close();
+
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
 
@@ -127,6 +143,7 @@ public class TestLocalApplicationRunner {
     prepareTest();
 
     StreamProcessor sp = mock(StreamProcessor.class);
+    CoordinatorStreamStore metadataStore = mock(CoordinatorStreamStore.class);
 
     ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
         ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -139,11 +156,16 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.empty()));
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(),
+        captor.capture(), eq(Optional.empty()), any(CoordinatorStreamStore.class));
+    doReturn(metadataStore).when(runner).createCoordinatorStreamStore(any(Config.class));
     doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
 
     runner.run();
 
+    verify(metadataStore).init();
+    verify(metadataStore, never()).close();
+
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
 
@@ -162,6 +184,7 @@ public class TestLocalApplicationRunner {
     doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
 
     StreamProcessor sp = mock(StreamProcessor.class);
+    CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
     ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
         ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
 
@@ -175,11 +198,15 @@ public class TestLocalApplicationRunner {
 
     ExternalContext externalContext = mock(ExternalContext.class);
     doReturn(sp).when(runner)
-        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+    doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
 
     runner.run(externalContext);
     runner.waitForFinish();
 
+    verify(coordinatorStreamStore).init();
+    verify(coordinatorStreamStore, never()).close();
+
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
 
@@ -198,6 +225,7 @@ public class TestLocalApplicationRunner {
     doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
 
     StreamProcessor sp = mock(StreamProcessor.class);
+    CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
     ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
         ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
 
@@ -208,7 +236,9 @@ public class TestLocalApplicationRunner {
 
     ExternalContext externalContext = mock(ExternalContext.class);
     doReturn(sp).when(runner)
-        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(
+            CoordinatorStreamStore.class));
+    doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
 
     try {
       runner.run(externalContext);
@@ -217,10 +247,60 @@ public class TestLocalApplicationRunner {
       assertNotNull(th);
     }
 
+    verify(coordinatorStreamStore).init();
+    verify(coordinatorStreamStore, never()).close();
+
     assertEquals(runner.status(), ApplicationStatus.UnsuccessfulFinish);
   }
 
   @Test
+  public void testKill() throws Exception {
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
+        ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+        listener.afterStart();
+        return null;
+      }).when(sp).start();
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+        listener.afterStop();
+        return null;
+      }).when(sp).stop();
+
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+    doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+    runner.run(externalContext);
+    runner.kill();
+
+    verify(coordinatorStreamStore).init();
+    verify(coordinatorStreamStore).close();
+
+    assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+  }
+
+  @Test
   public void testWaitForFinishReturnsBeforeTimeout() {
     long timeoutInMs = 1000;
 
@@ -351,7 +431,10 @@ public class TestLocalApplicationRunner {
     localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
     doReturn(localPlanner).when(runner).getPlanner(getClass().getClassLoader());
     StreamProcessor sp = mock(StreamProcessor.class);
-    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject());
+    CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject(), any(
+        CoordinatorStreamStore.class));
+    doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
   }
 
 }
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 94a45d6..1b3ee12 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -69,7 +69,8 @@ public class TestZkJobCoordinator {
 
   private final Config config;
   private final JobModel jobModel;
-  private final MetadataStore metadataStore;
+  private final MetadataStore zkMetadataStore;
+  private final CoordinatorStreamStore coordinatorStreamStore;
 
   public TestZkJobCoordinator() {
     Map<String, String> configMap = ImmutableMap.of(
@@ -85,7 +86,8 @@ public class TestZkJobCoordinator {
         new TaskName("t1"), new TaskModel(new TaskName("t1"), ssps, new Partition(0)));
     ContainerModel containerModel = new ContainerModel("0", tasksForContainer);
     jobModel = new JobModel(config, ImmutableMap.of("0", containerModel));
-    metadataStore = Mockito.mock(MetadataStore.class);
+    zkMetadataStore = Mockito.mock(MetadataStore.class);
+    coordinatorStreamStore = Mockito.mock(CoordinatorStreamStore.class);
   }
 
   @Test
@@ -99,7 +101,8 @@ public class TestZkJobCoordinator {
     when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
     when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     doReturn(new JobModel(new MapConfig(), new HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -127,7 +130,8 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
     final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -153,7 +157,8 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
     final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -184,7 +189,8 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.streamPartitionCountMonitor = monitor;
@@ -207,7 +213,8 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -234,7 +241,8 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -257,9 +265,10 @@ public class TestZkJobCoordinator {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils, metadataStore));
-    doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
-    doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils,
+        zkMetadataStore, coordinatorStreamStore));
+    doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
     MetadataResourceUtil mockMetadataResourceUtil = mock(MetadataResourceUtil.class);
     doReturn(mockMetadataResourceUtil).when(zkJobCoordinator)
@@ -287,9 +296,10 @@ public class TestZkJobCoordinator {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils, metadataStore));
-    doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
-    doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config,
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
+    doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
     doReturn(jobModel).when(zkJobCoordinator).generateNewJobModel(any());
     doNothing().when(zkJobCoordinator).loadMetadataResources(jobModel);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 3ff170a..ce42bee 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -65,8 +65,8 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
     this(app, false, config);
   }
 
-  public SamzaSqlApplicationRunner(Boolean localRunner, Config config) {
-    this(new SamzaSqlApplication(), localRunner, config);
+  public SamzaSqlApplicationRunner(Boolean isLocalRunner, Config config) {
+    this(new SamzaSqlApplication(), isLocalRunner, config);
   }
 
   private SamzaSqlApplicationRunner(SamzaApplication app, Boolean localRunner, Config config) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index a5b03a0..3714fcc 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -56,6 +56,8 @@ public class SamzaSqlTestConfig {
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
   public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
   public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
+  public static final String SQL_JOB = "sql-job";
+  public static final String SQL_JOB_PROCESSOR_ID = "1";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
     return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -79,8 +81,8 @@ public class SamzaSqlTestConfig {
       boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
     HashMap<String, String> staticConfigs = new HashMap<>();
 
-    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
-    staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+    staticConfigs.put(JobConfig.JOB_NAME(), SQL_JOB);
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), SQL_JOB_PROCESSOR_ID);
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY, SingleContainerGrouperFactory.class.getName());
 
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 42ebf32..d1e5a8a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -283,7 +283,7 @@ public class TestRunner {
     // Cleaning store directories to ensure current run does not pick up state from previous run
     deleteStoreDirectories();
     Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config, new InMemoryMetadataStoreFactory());
     runner.run(externalContext);
     if (!runner.waitForFinish(timeout)) {
       throw new SamzaException("Timed out waiting for application to finish");
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 81ac963..6239c61 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -45,6 +45,7 @@ import org.apache.samza.config.ZkConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -63,6 +64,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
 import org.junit.Before;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,7 +135,8 @@ public class TestZkStreamProcessorBase extends IntegrationTestHarness {
     String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     JobCoordinator jobCoordinator =
         ReflectionUtil.getObj(getClass().getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class)
-            .getJobCoordinator(pId, config, new MetricsRegistryMap());
+            .getJobCoordinator(pId, config, new MetricsRegistryMap(), Mockito.mock(
+                CoordinatorStreamStore.class));
 
     ProcessorLifecycleListener listener = new ProcessorLifecycleListener() {
       @Override
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
index 4ab543d..9ce1ea4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
@@ -18,14 +18,36 @@
  */
 package org.apache.samza.test.samzasql;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.system.MockSystemFactory;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.util.CoordinatorStreamUtil;
 
 
 public class SamzaSqlIntegrationTestHarness extends IntegrationTestHarness {
+
+  public static final String MOCK_METADATA_SYSTEM = "mockmetadatasystem";
+
   protected void runApplication(Config config) {
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, config);
+    // Use MockSystemFactory for the coordinator system
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition(MOCK_METADATA_SYSTEM,
+        CoordinatorStreamUtil.getCoordinatorStreamName(SamzaSqlTestConfig.SQL_JOB, SamzaSqlTestConfig.SQL_JOB_PROCESSOR_ID),
+        new Partition(0)), new ArrayList<>());
+    HashMap<String, String> mapConfig = new HashMap<>();
+    mapConfig.put(JobConfig.JOB_COORDINATOR_SYSTEM(), MOCK_METADATA_SYSTEM);
+    mapConfig.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_METADATA_SYSTEM), MockSystemFactory.class.getName());
+    mapConfig.putAll(config);
+
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(mapConfig));
     executeRun(runner, config);
     runner.waitForFinish();
   }