You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/09/19 19:16:20 UTC

samza git commit: SAMZA-1859: Zookeeper implementation of MetadataStore.

Repository: samza
Updated Branches:
  refs/heads/master 19c6f4f61 -> 160927ada


SAMZA-1859: Zookeeper implementation of MetadataStore.

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Daniel Nishumura <dn...@linkedin.com>

Closes #629 from shanthoosh/metadata_store_zk_impl


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/160927ad
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/160927ad
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/160927ad

Branch: refs/heads/master
Commit: 160927adad9843cf5110b341cb0b67413f75249d
Parents: 19c6f4f
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Wed Sep 19 12:16:15 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 19 12:16:15 2018 -0700

----------------------------------------------------------------------
 .../samza/metadatastore/MetadataStore.java      |   6 +-
 .../apache/samza/container/LocalityManager.java |   2 +-
 .../grouper/task/TaskAssignmentManager.java     |   2 +-
 .../metadatastore/CoordinatorStreamStore.java   |   2 +-
 .../java/org/apache/samza/zk/ProcessorData.java |  19 +--
 .../samza/zk/ZkJobCoordinatorFactory.java       |  17 +--
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |  30 +++--
 .../org/apache/samza/zk/ZkMetadataStore.java    | 132 +++++++++++++++++++
 .../apache/samza/zk/ZkMetadataStoreFactory.java |  36 +++++
 .../apache/samza/container/SamzaContainer.scala |  26 ++--
 .../TestCoordinatorStreamStore.java             |   2 +-
 .../org/apache/samza/zk/TestZkKeyBuilder.java   |   2 +-
 .../apache/samza/zk/TestZkMetadataStore.java    | 121 +++++++++++++++++
 13 files changed, 340 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index aaa420b..cd04794 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -19,8 +19,6 @@
 package org.apache.samza.metadatastore;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
 import java.util.Map;
 
 /**
@@ -32,10 +30,8 @@ public interface MetadataStore {
   /**
    * Initializes the metadata store, if applicable, setting up the underlying resources
    * and connections to the store endpoints.
-   *
-   * @param config the configuration for instantiating the MetadataStore.
    */
-  void init(Config config, MetricsRegistry metricsRegistry);
+  void init();
 
   /**
    * Gets the value associated with the specified {@code key}.

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index c70b15a..20e86d9 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -78,7 +78,7 @@ public class LocalityManager {
     this.config = config;
     MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
     this.metadataStore = metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, metricsRegistry);
-    this.metadataStore.init(config, metricsRegistry);
+    this.metadataStore.init();
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
     this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde);

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 42a6e81..2bfd4c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -84,7 +84,7 @@ public class TaskAssignmentManager {
   }
 
   public void init(Config config, MetricsRegistry metricsRegistry) {
-    this.metadataStore.init(config, metricsRegistry);
+    this.metadataStore.init();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index d74188b..af5e2f9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -89,7 +89,7 @@ public class CoordinatorStreamStore implements MetadataStore {
   }
 
   @Override
-  public void init(Config config, MetricsRegistry metricsRegistry) {
+  public void init() {
     if (isInitialized.compareAndSet(false, true)) {
       LOG.info("Starting the coordinator stream system consumer with config: {}.", config);
       registerConsumer();

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
index a48a450..91ba33d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
@@ -21,17 +21,18 @@ package org.apache.samza.zk;
 
 import java.util.Objects;
 import org.apache.samza.SamzaException;
+import org.apache.samza.runtime.LocationId;
 
 /**
  * Represents processor data stored in zookeeper processors node.
  */
 public class ProcessorData {
   private final String processorId;
-  private final String host;
+  private final String locationId;
 
-  public ProcessorData(String host, String processorId) {
+  public ProcessorData(String locationId, String processorId) {
     this.processorId = processorId;
-    this.host = host;
+    this.locationId = locationId;
   }
 
   public ProcessorData(String data) {
@@ -39,16 +40,16 @@ public class ProcessorData {
     if (splt.length != 2) {
       throw new SamzaException("incorrect processor data format = " + data);
     }
-    host = splt[0];
+    locationId = splt[0];
     processorId = splt[1];
   }
 
   public String toString() {
-    return host + " " + processorId;
+    return locationId + " " + processorId;
   }
 
-  public String getHost() {
-    return host;
+  public LocationId getLocationId() {
+    return new LocationId(locationId);
   }
 
   public String getProcessorId() {
@@ -57,7 +58,7 @@ public class ProcessorData {
 
   @Override
   public int hashCode() {
-    return Objects.hash(processorId, host);
+    return Objects.hash(processorId, locationId);
   }
 
   @Override
@@ -65,6 +66,6 @@ public class ProcessorData {
     if (obj == null) return false;
     if (getClass() != obj.getClass()) return false;
     final ProcessorData other = (ProcessorData) obj;
-    return Objects.equals(processorId, other.processorId) && Objects.equals(host, other.host);
+    return Objects.equals(processorId, other.processorId) && Objects.equals(locationId, other.locationId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 6888df0..3dad6c1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -31,7 +31,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
 
   private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
@@ -40,22 +39,24 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   private static final String DEFAULT_JOB_NAME = "defaultJob";
 
   /**
-   * Method to instantiate an implementation of JobCoordinator
+   * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
    *
-   * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
-   * @return An instance of IJobCoordinator
+   * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
+   * @return An instance of {@link ZkJobCoordinator}
    */
   @Override
   public JobCoordinator getJobCoordinator(Config config) {
+    // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
     MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    ZkUtils zkUtils = getZkUtils(config, metricsRegistry);
-    LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config);
+    String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
+    ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
+    LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
     return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
   }
 
-  private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
+  private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(coordinatorZkBasePath);
     ZkClient zkClient = ZkCoordinationUtilsFactory
         .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
     return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), metricsRegistry);

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 37bff6d..16efe81 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -42,16 +42,18 @@ import com.google.common.base.Strings;
  * This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
  */
 public class ZkKeyBuilder {
+
+  static final String PROCESSORS_PATH = "processors";
+  static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration";
+  static final String JOB_MODEL_UPGRADE_BARRIER_PATH = "jobModelUpgradeBarrier";
+  private static final String TASK_LOCALITY_PATH = "taskLocality";
+
   /**
    * Prefix generated to uniquely identify a particular deployment of a job.
    * TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well.
    */
   private final String pathPrefix;
 
-  static final String PROCESSORS_PATH = "processors";
-  static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
-  static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
-
   public ZkKeyBuilder(String pathPrefix) {
     if (pathPrefix != null && !pathPrefix.trim().isEmpty()) {
       this.pathPrefix = pathPrefix.trim();
@@ -60,11 +62,11 @@ public class ZkKeyBuilder {
     }
   }
 
-  public String getRootPath() {
+  String getRootPath() {
     return "/" + pathPrefix;
   }
 
-  public String getProcessorsPath() {
+  String getProcessorsPath() {
     return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
   }
 
@@ -77,25 +79,29 @@ public class ZkKeyBuilder {
    * @param path Full ZK path of a registered processor
    * @return String representing the processor ID
    */
-  public static String parseIdFromPath(String path) {
+  static String parseIdFromPath(String path) {
     if (!Strings.isNullOrEmpty(path))
       return path.substring(path.lastIndexOf("/") + 1);
     return null;
   }
 
-  public String getJobModelVersionPath() {
+  String getJobModelVersionPath() {
     return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
   }
 
-  public String getJobModelPathPrefix() {
+  String getJobModelPathPrefix() {
     return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
   }
 
-  public String getJobModelPath(String jobModelVersion) {
+  String getJobModelPath(String jobModelVersion) {
     return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
   }
 
-  public String getJobModelVersionBarrierPrefix() {
-    return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER);
+  String getJobModelVersionBarrierPrefix() {
+    return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER_PATH);
+  }
+
+  String getTaskLocalityPath() {
+    return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
new file mode 100644
index 0000000..4cfdc8d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.SamzaException;
+import org.I0Itec.zkclient.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link MetadataStore} interface where the
+ * metadata of the Samza job is stored in zookeeper.
+ */
+public class ZkMetadataStore implements MetadataStore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZkMetadataStore.class);
+
+  private final ZkClient zkClient;
+  private final ZkConfig zkConfig;
+  private final String zkBaseDir;
+
+  public ZkMetadataStore(String zkBaseDir, Config config, MetricsRegistry metricsRegistry) {
+    this.zkConfig = new ZkConfig(config);
+    this.zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs(), new BytesPushThroughSerializer());
+    this.zkBaseDir = zkBaseDir;
+    zkClient.createPersistent(zkBaseDir, true);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init() {
+    zkClient.waitUntilConnected(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] get(byte[] key) {
+    return zkClient.readData(getZkPathForKey(key), true);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void put(byte[] key, byte[] value) {
+    String zkPath = getZkPathForKey(key);
+    zkClient.createPersistent(zkPath, true);
+    zkClient.writeData(zkPath, value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void delete(byte[] key) {
+    zkClient.delete(getZkPathForKey(key));
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws SamzaException if there're exceptions reading data from zookeeper.
+   */
+  @Override
+  public Map<byte[], byte[]> all() {
+    try {
+      List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
+      Map<byte[], byte[]> result = new HashMap<>();
+      for (String zkSubDir : zkSubDirectories) {
+        String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
+        byte[] value = zkClient.readData(completeZkPath, true);
+        if (value != null) {
+          result.put(completeZkPath.getBytes("UTF-8"), value);
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      String errorMsg = String.format("Error reading path: %s from zookeeper.", zkBaseDir);
+      LOG.error(errorMsg, e);
+      throw new SamzaException(errorMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() {
+    // No-op for zookeeper implementation.
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    zkClient.close();
+  }
+
+  private String getZkPathForKey(byte[] key) {
+    return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
new file mode 100644
index 0000000..a9c979d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Builds the {@link ZkMetadataStore} based upon the provided {@link Config}
+ * and {@link MetricsRegistry}.
+ */
+public class ZkMetadataStoreFactory implements MetadataStoreFactory {
+
+  @Override
+  public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
+    return new ZkMetadataStore(namespace, config, metricsRegistry);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 68de4a6..7b64f5e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -129,12 +129,6 @@ object SamzaContainer extends Logging {
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
 
-    var localityManager: LocalityManager = null
-    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      val registryMap = new MetricsRegistryMap(containerName)
-      localityManager = new LocalityManager(config, registryMap)
-    }
-
     val containerPID = ManagementFactory.getRuntimeMXBean().getName()
 
     info("Setting up Samza container: %s" format containerName)
@@ -719,7 +713,6 @@ object SamzaContainer extends Logging {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
-      localityManager = localityManager,
       securityManager = securityManager,
       metrics = samzaContainerMetrics,
       reporters = reporters,
@@ -799,7 +792,7 @@ class SamzaContainer(
       startDiagnostics
       startAdmins
       startOffsetManager
-      startLocalityManager
+      storeContainerLocality
       startStores
       startTableManager
       startDiskSpaceMonitor
@@ -841,7 +834,6 @@ class SamzaContainer(
       shutdownDiskSpaceMonitor
       shutdownHostStatisticsMonitor
       shutdownProducers
-      shutdownLocalityManager
       shutdownOffsetManager
       shutdownMetrics
       shutdownSecurityManger
@@ -961,8 +953,10 @@ class SamzaContainer(
     offsetManager.start
   }
 
-  def startLocalityManager {
-    if(localityManager != null) {
+  def storeContainerLocality {
+    val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled
+    if (isHostAffinityEnabled) {
+      val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry)
       val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
       info("Registering %s with metadata store" format containerName)
       try {
@@ -978,6 +972,9 @@ class SamzaContainer(
         case unknownException: Throwable =>
           warn("Received an exception when persisting locality info for container %s: " +
             "%s" format (containerContext.id, unknownException.getMessage))
+      } finally {
+        info("Shutting down locality manager.")
+        localityManager.close()
       }
     }
   }
@@ -1145,13 +1142,6 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownTableManager)
   }
 
-  def shutdownLocalityManager {
-    if(localityManager != null) {
-      info("Shutting down locality manager.")
-      localityManager.close()
-    }
-  }
-
   def shutdownOffsetManager {
     info("Shutting down offset manager.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index da2d984..0e48363 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -57,7 +57,7 @@ public class TestCoordinatorStreamStore {
     when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test"));
     when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
     coordinatorStreamStore = new CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap), new MetricsRegistryMap());
-    coordinatorStreamStore.init(new MapConfig(), new MetricsRegistryMap());
+    coordinatorStreamStore.init();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 8ddd688..d2175b2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -56,6 +56,6 @@ public class TestZkKeyBuilder {
     Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
     String version = "2";
     Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
-    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
new file mode 100644
index 0000000..5930c65
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZkMetadataStore {
+
+  private static final String LOCALHOST = "127.0.0.1";
+
+  private static EmbeddedZookeeper zkServer;
+
+  private MetadataStore zkMetadataStore;
+
+  @BeforeClass
+  public static void beforeClass() {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    zkServer.teardown();
+  }
+
+  @Before
+  public void beforeTest() {
+    String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort());
+    Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString));
+    zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
+  }
+
+  @After
+  public void afterTest() {
+    zkMetadataStore.close();
+  }
+
+  @Test
+  public void testReadAfterWrite() throws Exception {
+    byte[] key = "test-key1".getBytes("UTF-8");
+    byte[] value = "test-value1".getBytes("UTF-8");
+    Assert.assertNull(zkMetadataStore.get(key));
+    zkMetadataStore.put(key, value);
+    Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
+    Assert.assertEquals(1, zkMetadataStore.all().size());
+  }
+
+  @Test
+  public void testReadAfterDelete() throws Exception {
+    byte[] key = "test-key1".getBytes("UTF-8");
+    byte[] value = "test-value1".getBytes("UTF-8");
+    Assert.assertNull(zkMetadataStore.get(key));
+    zkMetadataStore.put(key, value);
+    Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
+    zkMetadataStore.delete(key);
+    Assert.assertNull(zkMetadataStore.get(key));
+    Assert.assertEquals(0, zkMetadataStore.all().size());
+  }
+
+  @Test
+  public void testReadOfNonExistentKey() throws Exception {
+    Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8")));
+    Assert.assertEquals(0, zkMetadataStore.all().size());
+  }
+
+  @Test
+  public void testMultipleUpdatesForSameKey() throws Exception {
+    byte[] key = "test-key1".getBytes("UTF-8");
+    byte[] value = "test-value1".getBytes("UTF-8");
+    byte[] value1 = "test-value2".getBytes("UTF-8");
+    zkMetadataStore.put(key, value);
+    zkMetadataStore.put(key, value1);
+    Assert.assertTrue(Arrays.equals(value1, zkMetadataStore.get(key)));
+    Assert.assertEquals(1, zkMetadataStore.all().size());
+  }
+
+  @Test
+  public void testAllEntries() throws Exception {
+    byte[] key = "test-key1".getBytes("UTF-8");
+    byte[] key1 = "test-key2".getBytes("UTF-8");
+    byte[] key2 = "test-key3".getBytes("UTF-8");
+    byte[] value = "test-value1".getBytes("UTF-8");
+    byte[] value1 = "test-value2".getBytes("UTF-8");
+    byte[] value2 = "test-value3".getBytes("UTF-8");
+    zkMetadataStore.put(key, value);
+    zkMetadataStore.put(key1, value1);
+    zkMetadataStore.put(key2, value2);
+    ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+    Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
+  }
+}