You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/01/26 01:28:18 UTC

[2/2] samza git commit: SAMZA-1548; Add start() and stop() to SystemAdmin

SAMZA-1548; Add start() and stop() to SystemAdmin

This patch adds start() and stop() to SystemAdmin interface. This can be useful for e.g. kafka.admin.AdminClient which needs to be started before it can be used.

Since we add this method in interface and expect AdminClient to be stateful and probably has its own thread, there will be higher cost to instantiate a new SystemAdmin. Thus we probably want to re-use the SystemAdmin instances instead of creating SystemAdmin on demand when needed. Therefore, this patch also adds SystemAdmins class to help manage a map from system to SystemAdmin, similar to the existing SystemProducers class in Samza.

Author: Dong Lin <li...@gmail.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #397 from lindong28/SAMZA-1548


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/75e70e56
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/75e70e56
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/75e70e56

Branch: refs/heads/master
Commit: 75e70e5697ae58e560dd84d0cc52df713386fe08
Parents: 2e04e17
Author: Dong Lin <li...@gmail.com>
Authored: Thu Jan 25 17:28:13 2018 -0800
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Thu Jan 25 17:28:13 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemAdmin.java    | 11 ++++
 .../autoscaling/deployer/ConfigManager.java     |  4 +-
 .../samza/coordinator/AzureJobCoordinator.java  | 12 +++-
 .../ClusterBasedJobCoordinator.java             | 18 +++---
 .../StreamPartitionCountMonitor.java            |  1 -
 .../stream/CoordinatorStreamSystemConsumer.java | 27 +++++++--
 .../stream/CoordinatorStreamSystemProducer.java | 31 +++++++---
 .../stream/CoordinatorStreamWriter.java         |  2 +-
 .../apache/samza/execution/StreamManager.java   | 15 ++---
 .../runtime/AbstractApplicationRunner.java      | 16 ++++-
 .../samza/runtime/LocalApplicationRunner.java   |  2 +
 .../samza/runtime/LocalContainerRunner.java     |  1 +
 .../samza/runtime/RemoteApplicationRunner.java  |  7 +--
 .../standalone/PassthroughJobCoordinator.java   | 29 +++------
 .../apache/samza/storage/StorageRecovery.java   | 45 ++++----------
 .../org/apache/samza/system/SystemAdmins.java   | 64 ++++++++++++++++++++
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 11 +++-
 .../apache/samza/checkpoint/OffsetManager.scala | 10 ++-
 .../apache/samza/container/SamzaContainer.scala | 39 +++++++-----
 .../apache/samza/container/TaskInstance.scala   |  6 +-
 .../samza/coordinator/JobModelManager.scala     | 60 +++++-------------
 .../stream/CoordinatorStreamSystemFactory.scala | 50 ---------------
 .../scala/org/apache/samza/job/JobRunner.scala  | 12 ++--
 .../samza/storage/TaskStorageManager.scala      | 15 ++---
 .../samza/system/StreamMetadataCache.scala      | 25 ++------
 .../system/chooser/BootstrappingChooser.scala   | 22 +++----
 .../samza/system/chooser/DefaultChooser.scala   | 12 ++--
 .../main/scala/org/apache/samza/util/Util.scala | 18 ++++--
 .../TestClusterBasedJobCoordinator.java         |  7 +--
 .../samza/execution/TestExecutionPlanner.java   |  3 +-
 .../execution/TestJobGraphJsonGenerator.java    |  5 +-
 .../samza/execution/TestStreamManager.java      |  7 ++-
 .../runtime/TestApplicationRunnerMain.java      |  2 +
 .../samza/checkpoint/TestOffsetManager.scala    | 17 +++---
 .../samza/container/TestSamzaContainer.scala    | 23 +++++--
 .../samza/container/TestTaskInstance.scala      |  4 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  4 +-
 .../TestStreamPartitionCountMonitor.scala       |  6 +-
 .../processor/StreamProcessorTestUtils.scala    |  6 +-
 .../samza/storage/TestTaskStorageManager.scala  |  2 +-
 .../samza/system/TestStreamMetadataCache.scala  | 10 +--
 .../chooser/TestBootstrappingChooser.scala      | 25 +++++---
 .../system/chooser/TestDefaultChooser.scala     |  5 +-
 .../kafka/KafkaCheckpointManager.scala          |  4 ++
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 11 ++++
 .../system/kafka/KafkaSystemConsumer.scala      |  2 +
 .../samza/rest/proxy/task/SamzaTaskProxy.java   |  4 +-
 .../sql/runner/SamzaSqlApplicationRunner.java   |  2 +
 48 files changed, 386 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index e765540..dce7030 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -28,6 +28,17 @@ import java.util.Set;
  * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
+
+  /*
+   * Start this SystemAdmin
+   */
+  default void start() {};
+
+  /*
+   * Stop this SystemAdmin
+   */
+  default void stop() {};
+
   /**
    * Fetches the offsets for the messages immediately after the supplied offsets
    * for a group of SystemStreamPartitions.

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index d1b532f..d709254 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -29,7 +29,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.job.model.ContainerModel;
@@ -111,8 +110,7 @@ public class ConfigManager {
     }
 
     this.config = config;
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
-    this.coordinatorStreamConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+    this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     this.yarnUtil = new YarnUtil(rmAddress, rmPort);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index ca3384d..2b65ae0 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -42,12 +42,14 @@ import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlobUtils;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.LeaseBlobManager;
+import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.TableUtils;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -91,6 +93,7 @@ public class AzureJobCoordinator implements JobCoordinator {
   private RenewLeaseScheduler renewLease;
   private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
   private StreamMetadataCache streamMetadataCache = null;
+  private SystemAdmins systemAdmins = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel jobModel = null;
 
@@ -124,9 +127,12 @@ public class AzureJobCoordinator implements JobCoordinator {
 
   @Override
   public void start() {
-
     LOG.info("Starting Azure job coordinator.");
-    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+
+    // The systemAdmins should be started before streamMetadataCache can be used. And it should be stopped when this coordinator is stopped.
+    systemAdmins = new SystemAdmins(config);
+    systemAdmins.start();
+    streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
     table.addProcessorEntity(INITIAL_STATE, processorId, false);
 
     // Start scheduler for heartbeating
@@ -164,6 +170,8 @@ public class AzureJobCoordinator implements JobCoordinator {
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
+
+    systemAdmins.stop();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 3d67cae..91b94f4 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,13 +19,11 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.PartitionChangeException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -37,10 +35,9 @@ import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SystemClock;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +134,8 @@ public class ClusterBasedJobCoordinator {
    */
   volatile private Exception coordinatorException = null;
 
+  private SystemAdmins systemAdmins = null;
+
   /**
    * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually
    * run the jobcoordinator.
@@ -153,7 +152,9 @@ public class ClusterBasedJobCoordinator {
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
     state = new SamzaApplicationState(jobModelManager);
-    partitionMonitor = getPartitionCountMonitor(config);
+    // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
+    systemAdmins = new SystemAdmins(config);
+    partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabled();
 
@@ -186,6 +187,7 @@ public class ClusterBasedJobCoordinator {
       log.info("Starting Cluster Based Job Coordinator");
 
       containerProcessManager.start();
+      systemAdmins.start();
       partitionMonitor.start();
 
       boolean isInterrupted = false;
@@ -221,6 +223,7 @@ public class ClusterBasedJobCoordinator {
 
     try {
       partitionMonitor.stop();
+      systemAdmins.stop();
       containerProcessManager.stop();
     } catch (Throwable e) {
       log.error("Exception while stopping task manager {}", e);
@@ -242,9 +245,8 @@ public class ClusterBasedJobCoordinator {
     return jobModelManager;
   }
 
-  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
-    Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
-    StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
+  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index 16e8221..65b266e 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -139,7 +139,6 @@ public class StreamPartitionCountMonitor {
               }
             }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
           }
-
           state = State.RUNNING;
           break;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index c343865..4bbf452 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -32,16 +32,19 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,18 +66,30 @@ public class CoordinatorStreamSystemConsumer {
   private final Object bootstrapLock = new Object();
   private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();
 
-  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+  public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
+    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
+    SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry);
+
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     this.systemConsumer = systemConsumer;
     this.systemAdmin = systemAdmin;
-    this.configMap = new HashMap();
+    this.configMap = new HashMap<>();
     this.isBootstrapped = false;
-    this.keySerde = keySerde;
-    this.messageSerde = messageSerde;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
+  // Used only for test
   public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
-    this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+    this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+    this.systemConsumer = systemConsumer;
+    this.systemAdmin = systemAdmin;
+    this.configMap = new HashMap<>();
+    this.isBootstrapped = false;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
   /**
@@ -124,6 +139,7 @@ public class CoordinatorStreamSystemConsumer {
     }
     log.info("Starting coordinator stream system consumer.");
     systemConsumer.start();
+    systemAdmin.start();
     isStarted = true;
   }
 
@@ -133,6 +149,7 @@ public class CoordinatorStreamSystemConsumer {
   public void stop() {
     log.info("Stopping coordinator stream system consumer.");
     systemConsumer.stop();
+    systemAdmin.stop();
     isStarted = false;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 36cf759..b984e73 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -27,12 +27,15 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,21 +53,31 @@ public class CoordinatorStreamSystemProducer {
   private final SystemAdmin systemAdmin;
   private boolean isStarted;
 
-  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
-    this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+
+  public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
+    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
+    SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry);
+    this.systemStream = coordinatorSystemStream;
+    this.systemProducer = systemProducer;
+    this.systemAdmin = systemAdmin;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
-  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+  // Used only for test
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
     this.systemStream = systemStream;
     this.systemProducer = systemProducer;
     this.systemAdmin = systemAdmin;
-    this.keySerde = keySerde;
-    this.messageSerde = messageSerde;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
   /**
    * Registers a source with the underlying SystemProducer.
-   * 
+   *
    * @param source
    *          The source to register.
    */
@@ -82,6 +95,7 @@ public class CoordinatorStreamSystemProducer {
     }
     log.info("Starting coordinator stream producer.");
     systemProducer.start();
+    systemAdmin.start();
     isStarted = true;
   }
 
@@ -91,12 +105,13 @@ public class CoordinatorStreamSystemProducer {
   public void stop() {
     log.info("Stopping coordinator stream producer.");
     systemProducer.stop();
+    systemAdmin.stop();
     isStarted = false;
   }
 
   /**
    * Serialize and send a coordinator stream message.
-   * 
+   *
    * @param message
    *          The message to send.
    */
@@ -119,7 +134,7 @@ public class CoordinatorStreamSystemProducer {
   /**
    * Helper method that sends a series of SetConfig messages to the coordinator
    * stream.
-   * 
+   *
    * @param source
    *          An identifier to denote which source is sending a message. This
    *          can be any arbitrary string.

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index 77594dc..daca6a0 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -47,7 +47,7 @@ public class CoordinatorStreamWriter {
 
 
   public CoordinatorStreamWriter(Config config) {
-    coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
+    coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 3028e5f..0a6eb83 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -34,6 +34,7 @@ import org.apache.samza.config.*;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.util.Util;
@@ -46,10 +47,10 @@ import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 public class StreamManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
 
-  private final Map<String, SystemAdmin> sysAdmins;
+  private final SystemAdmins systemAdmins;
 
-  public StreamManager(Map<String, SystemAdmin> sysAdmins) {
-    this.sysAdmins = sysAdmins;
+  public StreamManager(SystemAdmins systemAdmins) {
+    this.systemAdmins = systemAdmins;
   }
 
   public void createStreams(List<StreamSpec> streams) {
@@ -59,7 +60,7 @@ public class StreamManager {
 
     for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
       String systemName = entry.getKey();
-      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
 
       for (StreamSpec stream : entry.getValue()) {
         LOGGER.info("Creating stream {} with partitions {} on system {}",
@@ -72,7 +73,7 @@ public class StreamManager {
   Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
     Map<String, Integer> streamToPartitionCount = new HashMap<>();
 
-    SystemAdmin systemAdmin = sysAdmins.get(systemName);
+    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
     if (systemAdmin == null) {
       throw new SamzaException(String.format("System %s does not exist.", systemName));
     }
@@ -106,7 +107,7 @@ public class StreamManager {
           .collect(Collectors.toSet());
       intStreams.forEach(stream -> {
           LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName());
-          sysAdmins.get(stream.getSystemName()).clearStream(stream);
+          systemAdmins.getSystemAdmin(stream.getSystemName()).clearStream(stream);
         });
 
       //Find checkpoint stream and clean up
@@ -126,7 +127,7 @@ public class StreamManager {
           LOGGER.info("Clear store {} changelog {}", store, changelog);
           SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
           StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
-          sysAdmins.get(spec.getSystemName()).clearStream(spec);
+          systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
         }
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index b8a8ca1..609b0ec 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -23,7 +23,6 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ApplicationConfig.ApplicationMode;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
@@ -32,6 +31,7 @@ import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmins;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +50,12 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
   private final StreamManager streamManager;
+  private final SystemAdmins systemAdmins;
 
   public AbstractApplicationRunner(Config config) {
     super(config);
-    this.streamManager = new StreamManager(new JavaSystemConfig(config).getSystemAdmins());
+    this.systemAdmins = new SystemAdmins(config);
+    this.streamManager = new StreamManager(systemAdmins);
   }
 
   @Override
@@ -63,6 +65,16 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return getStreamSpec(streamId, physicalName);
   }
 
+  @Override
+  public void run(StreamApplication streamApp) {
+    systemAdmins.start();
+  }
+
+  @Override
+  public void kill(StreamApplication streamApp) {
+    systemAdmins.stop();
+  }
+
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index e9b6bc8..5c5ee84 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -147,6 +147,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
+      super.run(app);
       // 1. initialize and plan
       ExecutionPlan plan = getExecutionPlan(app);
 
@@ -181,6 +182,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     processors.forEach(StreamProcessor::stop);
+    super.kill(streamApp);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 6750ccd..998df8b 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -72,6 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
+    super.run(streamApp);
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
     container = SamzaContainer$.MODULE$.apply(

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 1ead841..ea218d0 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -25,7 +25,6 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -59,6 +58,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
+      super.run(app);
       // TODO: run.id needs to be set for standalone: SAMZA-1531
       // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
       String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
@@ -95,6 +95,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
+      super.kill(app);
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
     }
@@ -149,9 +150,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   }
 
   private Config getConfigFromPrevRun() {
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(
-        config, new MetricsRegistryMap());
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     consumer.register();
     consumer.start();
     consumer.bootstrap();

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 5147169..e45b778 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -18,11 +18,9 @@
  */
 package org.apache.samza.standalone;
 
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
@@ -30,15 +28,12 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Standalone Job Coordinator does not implement any leader elector module or cluster manager
@@ -111,21 +106,9 @@ public class PassthroughJobCoordinator implements JobCoordinator {
 
   @Override
   public JobModel getJobModel() {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-      }
-      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
-    }
-
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
-        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
-
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
+    systemAdmins.start();
     String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
 
     /** TODO:
@@ -134,8 +117,10 @@ public class PassthroughJobCoordinator implements JobCoordinator {
      TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
      (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
      */
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+    JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
         Collections.singletonList(containerId));
+    systemAdmins.stop();
+    return jobModel;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index a47183e..c55f21f 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -40,7 +40,7 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
@@ -61,13 +61,12 @@ public class StorageRecovery extends CommandLine {
   private Config jobConfig;
   private int maxPartitionNumber = 0;
   private File storeBaseDir = null;
-  private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>();
-  private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
-  private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
-  private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
-  private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
-  private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
+  private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>();
+  private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>();
+  private Map<String, ContainerModel> containers = new HashMap<>();
+  private List<TaskStorageManager> taskStorageManagers = new ArrayList<>();
   private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
+  private SystemAdmins systemAdmins = null;
 
   /**
    * Construct the StorageRecovery
@@ -80,6 +79,7 @@ public class StorageRecovery extends CommandLine {
   StorageRecovery(Config config, String path) {
     jobConfig = config;
     storeBaseDir = new File(path, "state");
+    systemAdmins = new SystemAdmins(config);
   }
 
   /**
@@ -90,7 +90,6 @@ public class StorageRecovery extends CommandLine {
     log.info("setting up the recovery...");
 
     getContainerModels();
-    getSystemFactoriesAndAdmins();
     getChangeLogSystemStreamsAndStorageFactories();
     getChangeLogMaxPartitionNumber();
     getTaskStorageManagers();
@@ -104,11 +103,13 @@ public class StorageRecovery extends CommandLine {
 
     log.info("start recovering...");
 
+    systemAdmins.start();
     for (TaskStorageManager taskStorageManager : taskStorageManagers) {
       taskStorageManager.init();
       taskStorageManager.stopStores();
       log.debug("restored " + taskStorageManager.toString());
     }
+    systemAdmins.stop();
 
     log.info("successfully recovered in " + storeBaseDir.toString());
   }
@@ -123,27 +124,6 @@ public class StorageRecovery extends CommandLine {
   }
 
   /**
-   * get the SystemFactories and SystemAdmins specified in the config file and
-   * put them into the maps
-   */
-  private void getSystemFactoriesAndAdmins() {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig);
-    List<String> systems = systemConfig.getSystemNames();
-
-    for (String system : systems) {
-      String systemFactory = systemConfig.getSystemFactory(system);
-      if (systemFactory == null) {
-        throw new SamzaException("A stream uses system " + system + " which is missing from the configuration.");
-      }
-      systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory));
-      systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig));
-    }
-
-    log.info("Got system factories: " + systemFactories.keySet().toString());
-    log.info("Got system admins: " + systemAdmins.keySet().toString());
-  }
-
-  /**
    * get the changelog streams and the storage factories from the config file
    * and put them into the maps
    */
@@ -175,7 +155,8 @@ public class StorageRecovery extends CommandLine {
    * get the SystemConsumers for the stores
    */
   private HashMap<String, SystemConsumer> getStoreConsumers() {
-    HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>();
+    HashMap<String, SystemConsumer> storeConsumers = new HashMap<>();
+    Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories();
 
     for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) {
       String storeSystem = entry.getValue().getSystem();
@@ -207,7 +188,7 @@ public class StorageRecovery extends CommandLine {
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void getTaskStorageManagers() {
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
@@ -249,7 +230,7 @@ public class StorageRecovery extends CommandLine {
             storeBaseDir,
             storeBaseDir,
             taskModel.getChangelogPartition(),
-            Util.javaMapAsScalaMap(systemAdmins),
+            systemAdmins,
             new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(),
             new SystemClock());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
new file mode 100644
index 0000000..ae96b2d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+
+
+public class SystemAdmins {
+  private final Map<String, SystemAdmin> systemAdminMap;
+
+  public Map<String, SystemAdmin> getSystemAdminsMap() {
+    return systemAdminMap;
+  }
+
+  public SystemAdmins(Config config) {
+    JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+    this.systemAdminMap = systemConfig.getSystemAdmins();
+  }
+
+  // Used only for test
+  public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) {
+    this.systemAdminMap = systemAdminMap;
+  }
+
+  public void start() {
+    for (SystemAdmin systemAdmin: systemAdminMap.values()) {
+      systemAdmin.start();
+    }
+  }
+
+  public void stop() {
+    for (SystemAdmin systemAdmin: systemAdminMap.values()) {
+      systemAdmin.stop();
+    }
+  }
+
+  public SystemAdmin getSystemAdmin(String systemName) {
+    if (!systemAdminMap.containsKey(systemName)) {
+      throw new SamzaException("Cannot get systemAdmin for system " + systemName);
+    }
+    return systemAdminMap.get(systemName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f0c2ec7..801033d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -48,8 +48,10 @@ import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.SystemClock;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +90,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final Map<String, MetricsReporter> reporters;
 
   private StreamMetadataCache streamMetadataCache = null;
+  private SystemAdmins systemAdmins = null;
   private ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
@@ -120,13 +123,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
       });
+    systemAdmins = new SystemAdmins(config);
+    streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
   }
 
   @Override
   public void start() {
     startMetrics();
-    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
-
+    systemAdmins.start();
     zkController.register();
   }
 
@@ -144,6 +148,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
+    systemAdmins.stop();
   }
 
   private void startMetrics() {
@@ -196,7 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
     if (!hasCreatedChangeLogStreams) {
-      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions);
+      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions, systemAdmins);
       hasCreatedChangeLogStreams = true;
     }
     // Assign the next version of JobModel

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 1b2ce80..4959974 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -25,6 +25,7 @@ import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemAdmins
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -75,7 +76,7 @@ object OffsetManager extends Logging {
     systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
     config: Config,
     checkpointManager: CheckpointManager = null,
-    systemAdmins: Map[String, SystemAdmin] = Map(),
+    systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
     checkpointListeners: Map[String, CheckpointListener] = Map(),
     offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
@@ -141,7 +142,7 @@ class OffsetManager(
    * SystemAdmins that are used to get next offsets from last checkpointed
    * offsets. Map is from system name to SystemAdmin class for the system.
    */
-  val systemAdmins: Map[String, SystemAdmin] = Map(),
+  val systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
 
   /**
    * Map of checkpointListeners for the systems that chose to provide one.
@@ -396,10 +397,7 @@ class OffsetManager(
         taskName -> {
           sspToOffsets.asScala.groupBy(_._1.getSystem).flatMap {
             case (systemName, systemStreamPartitionOffsets) =>
-              systemAdmins
-                .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
-                .getOffsetsAfter(systemStreamPartitionOffsets.asJava)
-                .asScala
+              systemAdmins.getSystemAdmin(systemName).getOffsetsAfter(systemStreamPartitionOffsets.asJava).asScala
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f465bfc..5664754 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -38,7 +38,7 @@ import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -60,11 +60,7 @@ object SamzaContainer extends Logging {
 
   def getLocalityManager(containerName: String, config: Config): LocalityManager = {
     val registryMap = new MetricsRegistryMap(containerName)
-    val coordinatorSystemProducer =
-      new CoordinatorStreamSystemFactory()
-        .getCoordinatorStreamSystemProducer(
-          config,
-          new SamzaContainerMetrics(containerName, registryMap).registry)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry)
     new LocalityManager(coordinatorSystemProducer)
   }
 
@@ -151,13 +147,11 @@ object SamzaContainer extends Logging {
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
       (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
     }).toMap
-
-    val systemAdmins = systemNames
-      .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config)))
-      .toMap
-
     info("Got system factories: %s" format systemFactories.keys)
 
+    val systemAdmins = new SystemAdmins(config)
+    info("Got system admins: %s" format systemAdmins.getSystemAdminsMap().keySet())
+
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
     val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams)
 
@@ -360,12 +354,9 @@ object SamzaContainer extends Logging {
     // create a map of consumers with callbacks to pass to the OffsetManager
     val checkpointListeners = consumers.filter(_._2.isInstanceOf[CheckpointListener])
       .map { case (system, consumer) => (system, consumer.asInstanceOf[CheckpointListener])}
-
     info("Got checkpointListeners : %s" format checkpointListeners)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager,
-      systemAdmins, checkpointListeners, offsetManagerMetrics)
-
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
     val dropDeserializationError = config.getDropDeserialization match {
@@ -629,6 +620,7 @@ object SamzaContainer extends Logging {
       containerContext = containerContext,
       taskInstances = taskInstances,
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
@@ -647,6 +639,7 @@ class SamzaContainer(
   containerContext: SamzaContainerContext,
   taskInstances: Map[TaskName, TaskInstance],
   runLoop: Runnable,
+  systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
@@ -686,6 +679,7 @@ class SamzaContainer(
       jmxServer = new JmxServer()
 
       startMetrics
+      startAdmins
       startOffsetManager
       startLocalityManager
       startStores
@@ -733,6 +727,7 @@ class SamzaContainer(
       shutdownOffsetManager
       shutdownMetrics
       shutdownSecurityManger
+      shutdownAdmins
 
       if (!status.equals(SamzaContainerStatus.FAILED)) {
         status = SamzaContainerStatus.STOPPED
@@ -891,6 +886,13 @@ class SamzaContainer(
     taskInstances.values.foreach(_.initTask)
   }
 
+  def startAdmins {
+    info("Starting admin multiplexer.")
+
+    systemAdmins.start
+  }
+
+
   def startProducers {
     info("Registering task instances with producers.")
 
@@ -959,6 +961,13 @@ class SamzaContainer(
     consumerMultiplexer.stop
   }
 
+  def shutdownAdmins {
+    info("Shutting down admin multiplexer.")
+
+    systemAdmins.stop
+  }
+
+
   def shutdownProducers {
     info("Shutting down producer multiplexer.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index f2a5074..c7d76c2 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -39,7 +39,7 @@ class TaskInstance(
   val taskName: TaskName,
   config: Config,
   val metrics: TaskInstanceMetrics,
-  systemAdmins: Map[String, SystemAdmin],
+  systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
   containerContext: SamzaContainerContext,
@@ -57,7 +57,7 @@ class TaskInstance(
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
-  val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
+  val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
                                     storageManager, tableManager, jobModel, streamMetadataCache)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
@@ -258,7 +258,7 @@ class TaskInstance(
           val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
               .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
           val system = envelope.getSystemStreamPartition.getSystem
-          others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+          others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match {
             case null => {
               info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
               ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 99b1abe..4a804dd 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -23,14 +23,10 @@ package org.apache.samza.coordinator
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config.ClusterManagerConfig
-import org.apache.samza.config.JobConfig
+import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.MapConfig
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
-import org.apache.samza.config.StorageConfig
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper
 import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
@@ -39,7 +35,6 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
@@ -79,9 +74,8 @@ object JobModelManager extends Logging {
    *                                from the coordinator stream, and instantiate a JobModelManager.
    */
   def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
-    val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
-    val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
-    val coordinatorSystemProducer: CoordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemProducer: CoordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
     info("Registering coordinator system stream consumer.")
     coordinatorSystemConsumer.register
     debug("Starting coordinator system stream consumer.")
@@ -103,9 +97,8 @@ object JobModelManager extends Logging {
     localityManager.start()
 
     // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = getSystemAdmins(config)
-
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
+    val systemAdmins = new SystemAdmins(config)
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
     val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
 
     val processorList = new ListBuffer[String]()
@@ -113,9 +106,8 @@ object JobModelManager extends Logging {
     for (i <- 0 until containerCount) {
       processorList += i.toString
     }
-
-    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager,
-      streamMetadataCache, processorList.toList.asJava)
+    systemAdmins.start()
+    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, processorList.toList.asJava)
     val jobModel = jobModelManager.jobModel
     // Save the changelog mapping back to the ChangelogPartitionmanager
     // newChangelogPartitionMapping is the merging of all current task:changelog
@@ -130,9 +122,10 @@ object JobModelManager extends Logging {
     info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
     changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
 
-    createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
-    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+    createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
+    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
 
+    systemAdmins.stop()
     jobModelManager
   }
 
@@ -275,25 +268,7 @@ object JobModelManager extends Logging {
     }
   }
 
-  /**
-   * Instantiates the system admins based upon the system factory class available in {@param config}.
-   * @param config contains adequate information to instantiate the SystemAdmin.
-   * @return a map of SystemName(String) to the instantiated SystemAdmin.
-   */
-  def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = {
-    val systemNames = getSystemNames(config)
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-    systemAdmins
-  }
-
-  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
+  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins) {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
@@ -301,10 +276,7 @@ object JobModelManager extends Logging {
       .mapValues(Util.getSystemStreamFromNames(_))
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val systemAdmin = Util.getObj[SystemFactory](config
-        .getSystemFactory(systemStream.getSystem)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
-        ).getAdmin(systemStream.getSystem, config)
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
 
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions)
       if (systemAdmin.createStream(changelogSpec)) {
@@ -316,7 +288,7 @@ object JobModelManager extends Logging {
     }
   }
 
-  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = {
+  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins): Unit = {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
@@ -326,11 +298,7 @@ object JobModelManager extends Logging {
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val accessLog = config.getAccessLogEnabled(storeName)
       if (accessLog) {
-        val systemAdmin = Util.getObj[SystemFactory](config
-          .getSystemFactory(systemStream.getSystem)
-          .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
-        ).getAdmin(systemStream.getSystem, config)
-
+        val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
         val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream),
           config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions)
         systemAdmin.createStream(accessLogSpec)

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
deleted file mode 100644
index 9283812..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream
-
-import org.apache.samza.SamzaException
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.{Config, SystemConfig}
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.{SystemFactory, SystemStream}
-import org.apache.samza.util.Util
-
-/**
- * A helper class that does wiring for CoordinatorStreamSystemConsumer and
- * CoordinatorStreamSystemProducer. This factory should only be used in
- * situations where the underlying SystemConsumer/SystemProducer does not
- * exist.
- */
-class CoordinatorStreamSystemFactory {
-  def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
-    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
-    val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
-    new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
-  }
-
-  def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
-    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
-    val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
-    new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 0e973e9..7a250b2 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -23,7 +23,7 @@ package org.apache.samza.job
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -78,21 +78,23 @@ class JobRunner(config: Config) extends Logging {
   def run(resetJobConfig: Boolean = true) = {
     debug("config: %s" format (config))
     val jobFactory: StreamJobFactory = getJobFactory
-    val factory = new CoordinatorStreamSystemFactory
-    val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-    val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+    val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
 
     // Create the coordinator stream if it doesn't exist
     info("Creating coordinator stream")
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val coordinatorSystemStream = Util.getCoordinatorSystemStream(config)
+    val systemFactory = Util.getCoordinatorSystemFactory(config)
     val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
     val streamName = coordinatorSystemStream.getStream
     val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+    systemAdmin.start()
     if (systemAdmin.createStream(coordinatorSpec)) {
       info("Created coordinator stream %s." format streamName)
     } else {
       info("Coordinator stream %s already exists." format streamName)
     }
+    systemAdmin.stop()
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 62dcdb0..476e215 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -56,7 +56,7 @@ class TaskStorageManager(
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   partition: Partition,
-  systemAdmins: Map[String, SystemAdmin],
+  systemAdmins: SystemAdmins,
   changeLogDeleteRetentionsInMs: Map[String, Long],
   clock: Clock) extends Logging {
 
@@ -210,9 +210,7 @@ class TaskStorageManager(
     info("Validating change log streams: " + changeLogSystemStreams)
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val systemAdmin = systemAdmins
-        .getOrElse(systemStream.getSystem,
-                   throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
 
       systemAdmin.validateStream(changelogSpec)
@@ -230,8 +228,7 @@ class TaskStorageManager(
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-      val admin = systemAdmins.getOrElse(systemStream.getSystem,
-        throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val admin = systemAdmins.getSystemAdmin(systemStream.getSystem)
       val consumer = storeConsumers(storeName)
 
       val offset = getStartingOffset(systemStreamPartition, admin)
@@ -334,9 +331,7 @@ class TaskStorageManager(
     debug("Persisting logged key value stores")
 
     for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) {
-      val systemAdmin = systemAdmins
-              .getOrElse(systemStream.getSystem,
-                         throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
 
       debug("Fetching newest offset for store %s" format(storeName))
       try {
@@ -345,7 +340,7 @@ class TaskStorageManager(
           // rather than newest and oldest offsets for all SSPs. Use it if we can.
           systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new SystemStreamPartition(systemStream.getSystem, systemStream.getStream, partition), 3)
         } else {
-          val streamToMetadata = systemAdmins(systemStream.getSystem)
+          val streamToMetadata = systemAdmins.getSystemAdmin(systemStream.getSystem)
                   .getSystemStreamMetadata(Set(systemStream.getStream).asJava)
           val sspMetadata = streamToMetadata
                   .get(systemStream.getStream)

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 271279f..637858b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -19,27 +19,10 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging, Clock, SystemClock}
+import org.apache.samza.util.{Logging, Clock, SystemClock}
 import org.apache.samza.SamzaException
 import scala.collection.JavaConverters._
-import org.apache.samza.config.SystemConfig.Config2System
 
-object StreamMetadataCache {
-  def apply(cacheTtlMs: Int = 5000, config: Config): StreamMetadataCache = {
-    val systemNames = config.getSystemNames.toSet
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-
-    new StreamMetadataCache(systemAdmins, cacheTtlMs, SystemClock.instance)
-  }
-}
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
  * 5 seconds), so that we can make many metadata requests in quick succession without
@@ -48,7 +31,7 @@ object StreamMetadataCache {
  */
 class StreamMetadataCache (
     /** System implementations from which the actual metadata is loaded on cache miss */
-    systemAdmins: Map[String, SystemAdmin],
+    systemAdmins: SystemAdmins,
 
     /** Maximum age (in milliseconds) of a cache entry */
     val cacheTTLms: Int = 5000,
@@ -59,6 +42,7 @@ class StreamMetadataCache (
   private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
   private var cache = Map[SystemStream, CacheEntry]()
   private val lock = new Object
+
   /**
    * Returns metadata about each of the given streams (such as first offset, newest
    * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
@@ -77,8 +61,7 @@ class StreamMetadataCache (
       .groupBy[String](_.getSystem)
       .flatMap {
         case (systemName, systemStreams) =>
-          val systemAdmin = systemAdmins
-            .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
+          val systemAdmin = systemAdmins.getSystemAdmin(systemName)
           val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
             systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
           } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index b39439d..212ec05 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.system.chooser
 
-import org.apache.samza.SamzaException
+import java.util.HashMap
 import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsRegistry
@@ -72,7 +72,7 @@ class BootstrappingChooser(
    * A map from system stream name to SystemAdmin that is used for
    * offset comparisons.
    */
-  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
 
   /**
    * The number of lagging partitions for each SystemStream that's behind.
@@ -135,7 +135,7 @@ class BootstrappingChooser(
     wrapped.register(systemStreamPartition, offset)
 
     val system = systemStreamPartition.getSystem
-    val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system))
+    val systemAdmin = systemAdmins.getSystemAdmin(system)
     /**
      * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast
      * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition.
@@ -198,8 +198,8 @@ class BootstrappingChooser(
           updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
         }
 
-        // If the offset we just read is the same as the offset for the last 
-        // message (newest) in this system stream partition, then we have read 
+        // If the offset we just read is the same as the offset for the last
+        // message (newest) in this system stream partition, then we have read
         // all messages, and can mark this SSP as bootstrapped.
         checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
       }
@@ -246,7 +246,7 @@ class BootstrappingChooser(
   private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
     val systemStream = systemStreamPartition.getSystemStream
     val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
-    // Metadata for system/stream, and system/stream/partition are allowed to 
+    // Metadata for system/stream, and system/stream/partition are allowed to
     // be null since not all streams are bootstrap streams.
     val systemStreamPartitionMetadata = if (systemStreamMetadata != null) {
       systemStreamMetadata
@@ -256,8 +256,8 @@ class BootstrappingChooser(
       null
     }
     val offsetToCheck = if (systemStreamPartitionMetadata == null) {
-      // Use null for offsetToCheck in cases where the partition metadata was 
-      // null. A null partition metadata implies that the stream is not a 
+      // Use null for offsetToCheck in cases where the partition metadata was
+      // null. A null partition metadata implies that the stream is not a
       // bootstrap stream, and therefore, there is no need to check its offset.
       null
     } else {
@@ -266,8 +266,8 @@ class BootstrappingChooser(
 
     trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
 
-    // The SSP is no longer lagging if the envelope's offset equals the 
-    // latest offset. 
+    // The SSP is no longer lagging if the envelope's offset equals the
+    // latest offset.
     if (offset != null && offset.equals(offsetToCheck)) {
       laggingSystemStreamPartitions -= systemStreamPartition
       systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
@@ -277,7 +277,7 @@ class BootstrappingChooser(
       if (systemStreamLagCounts(systemStream) == 0) {
         info("Bootstrap stream is fully caught up: %s" format systemStream)
 
-        // If the lag count is 0, then no partition for this stream is lagging 
+        // If the lag count is 0, then no partition for this stream is lagging
         // (the stream has been fully caught up).
         systemStreamLagCounts -= systemStream
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index c0805c4..35c68c2 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -22,14 +22,18 @@ package org.apache.samza.system.chooser
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
 import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system._
 import org.apache.samza.util.Logging
-
+import java.util.HashMap
 import scala.collection.JavaConverters._
 
 
 object DefaultChooser extends Logging {
-  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = {
+  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata],
+            chooserFactory: MessageChooserFactory,
+            config: Config,
+            registry: MetricsRegistry,
+            systemAdmins: SystemAdmins) = {
     val chooserConfig = new DefaultChooserConfig(config)
     val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
 
@@ -251,7 +255,7 @@ class DefaultChooser(
    * Defines a mapping from SystemStream name to SystemAdmin.
    * This is useful for determining if a bootstrap SystemStream is caught up.
    */
-  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
 
   val chooser = {
     val useBatching = batchSize.isDefined

http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index cc2a097..ea23760 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -223,20 +223,28 @@ object Util extends Logging {
   }
 
   /**
-   * Get the coordinator system and system factory from the configuration
+   * Get the coordinator system stream from the configuration
    * @param config
    * @return
    */
-  def getCoordinatorSystemStreamAndFactory(config: Config) = {
+  def getCoordinatorSystemStream(config: Config) = {
     val systemName = config.getCoordinatorSystemName
     val (jobName, jobId) = Util.getJobNameAndId(config)
     val streamName = Util.getCoordinatorStreamName(jobName, jobId)
-    val coordinatorSystemStream = new SystemStream(systemName, streamName)
+    new SystemStream(systemName, streamName)
+  }
+
+  /**
+    * Get the coordinator system factory from the configuration
+    * @param config
+    * @return
+    */
+  def getCoordinatorSystemFactory(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
     val systemFactoryClassName = config
       .getSystemFactory(systemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-    (coordinatorSystemStream, systemFactory)
+    Util.getObj[SystemFactory](systemFactoryClassName)
   }
 
   /**