You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/09/19 19:16:20 UTC
samza git commit: SAMZA-1859: Zookeeper implementation of
MetadataStore.
Repository: samza
Updated Branches:
refs/heads/master 19c6f4f61 -> 160927ada
SAMZA-1859: Zookeeper implementation of MetadataStore.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Daniel Nishumura <dn...@linkedin.com>
Closes #629 from shanthoosh/metadata_store_zk_impl
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/160927ad
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/160927ad
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/160927ad
Branch: refs/heads/master
Commit: 160927adad9843cf5110b341cb0b67413f75249d
Parents: 19c6f4f
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Wed Sep 19 12:16:15 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 19 12:16:15 2018 -0700
----------------------------------------------------------------------
.../samza/metadatastore/MetadataStore.java | 6 +-
.../apache/samza/container/LocalityManager.java | 2 +-
.../grouper/task/TaskAssignmentManager.java | 2 +-
.../metadatastore/CoordinatorStreamStore.java | 2 +-
.../java/org/apache/samza/zk/ProcessorData.java | 19 +--
.../samza/zk/ZkJobCoordinatorFactory.java | 17 +--
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 30 +++--
.../org/apache/samza/zk/ZkMetadataStore.java | 132 +++++++++++++++++++
.../apache/samza/zk/ZkMetadataStoreFactory.java | 36 +++++
.../apache/samza/container/SamzaContainer.scala | 26 ++--
.../TestCoordinatorStreamStore.java | 2 +-
.../org/apache/samza/zk/TestZkKeyBuilder.java | 2 +-
.../apache/samza/zk/TestZkMetadataStore.java | 121 +++++++++++++++++
13 files changed, 340 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index aaa420b..cd04794 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -19,8 +19,6 @@
package org.apache.samza.metadatastore;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
import java.util.Map;
/**
@@ -32,10 +30,8 @@ public interface MetadataStore {
/**
* Initializes the metadata store, if applicable, setting up the underlying resources
* and connections to the store endpoints.
- *
- * @param config the configuration for instantiating the MetadataStore.
*/
- void init(Config config, MetricsRegistry metricsRegistry);
+ void init();
/**
* Gets the value associated with the specified {@code key}.
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index c70b15a..20e86d9 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -78,7 +78,7 @@ public class LocalityManager {
this.config = config;
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
this.metadataStore = metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, metricsRegistry);
- this.metadataStore.init(config, metricsRegistry);
+ this.metadataStore.init();
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde);
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 42a6e81..2bfd4c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -84,7 +84,7 @@ public class TaskAssignmentManager {
}
public void init(Config config, MetricsRegistry metricsRegistry) {
- this.metadataStore.init(config, metricsRegistry);
+ this.metadataStore.init();
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index d74188b..af5e2f9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -89,7 +89,7 @@ public class CoordinatorStreamStore implements MetadataStore {
}
@Override
- public void init(Config config, MetricsRegistry metricsRegistry) {
+ public void init() {
if (isInitialized.compareAndSet(false, true)) {
LOG.info("Starting the coordinator stream system consumer with config: {}.", config);
registerConsumer();
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
index a48a450..91ba33d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
@@ -21,17 +21,18 @@ package org.apache.samza.zk;
import java.util.Objects;
import org.apache.samza.SamzaException;
+import org.apache.samza.runtime.LocationId;
/**
* Represents processor data stored in zookeeper processors node.
*/
public class ProcessorData {
private final String processorId;
- private final String host;
+ private final String locationId;
- public ProcessorData(String host, String processorId) {
+ public ProcessorData(String locationId, String processorId) {
this.processorId = processorId;
- this.host = host;
+ this.locationId = locationId;
}
public ProcessorData(String data) {
@@ -39,16 +40,16 @@ public class ProcessorData {
if (splt.length != 2) {
throw new SamzaException("incorrect processor data format = " + data);
}
- host = splt[0];
+ locationId = splt[0];
processorId = splt[1];
}
public String toString() {
- return host + " " + processorId;
+ return locationId + " " + processorId;
}
- public String getHost() {
- return host;
+ public LocationId getLocationId() {
+ return new LocationId(locationId);
}
public String getProcessorId() {
@@ -57,7 +58,7 @@ public class ProcessorData {
@Override
public int hashCode() {
- return Objects.hash(processorId, host);
+ return Objects.hash(processorId, locationId);
}
@Override
@@ -65,6 +66,6 @@ public class ProcessorData {
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
final ProcessorData other = (ProcessorData) obj;
- return Objects.equals(processorId, other.processorId) && Objects.equals(host, other.host);
+ return Objects.equals(processorId, other.processorId) && Objects.equals(locationId, other.locationId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 6888df0..3dad6c1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -31,7 +31,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
@@ -40,22 +39,24 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
private static final String DEFAULT_JOB_NAME = "defaultJob";
/**
- * Method to instantiate an implementation of JobCoordinator
+ * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
- * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
- * @return An instance of IJobCoordinator
+ * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
+ * @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
+ // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
- ZkUtils zkUtils = getZkUtils(config, metricsRegistry);
- LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config);
+ String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
+ ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
+ LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}
- private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
+ private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) {
ZkConfig zkConfig = new ZkConfig(config);
- ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder(coordinatorZkBasePath);
ZkClient zkClient = ZkCoordinationUtilsFactory
.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), metricsRegistry);
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 37bff6d..16efe81 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -42,16 +42,18 @@ import com.google.common.base.Strings;
* This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
*/
public class ZkKeyBuilder {
+
+ static final String PROCESSORS_PATH = "processors";
+ static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration";
+ static final String JOB_MODEL_UPGRADE_BARRIER_PATH = "jobModelUpgradeBarrier";
+ private static final String TASK_LOCALITY_PATH = "taskLocality";
+
/**
* Prefix generated to uniquely identify a particular deployment of a job.
* TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well.
*/
private final String pathPrefix;
- static final String PROCESSORS_PATH = "processors";
- static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
- static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
-
public ZkKeyBuilder(String pathPrefix) {
if (pathPrefix != null && !pathPrefix.trim().isEmpty()) {
this.pathPrefix = pathPrefix.trim();
@@ -60,11 +62,11 @@ public class ZkKeyBuilder {
}
}
- public String getRootPath() {
+ String getRootPath() {
return "/" + pathPrefix;
}
- public String getProcessorsPath() {
+ String getProcessorsPath() {
return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
}
@@ -77,25 +79,29 @@ public class ZkKeyBuilder {
* @param path Full ZK path of a registered processor
* @return String representing the processor ID
*/
- public static String parseIdFromPath(String path) {
+ static String parseIdFromPath(String path) {
if (!Strings.isNullOrEmpty(path))
return path.substring(path.lastIndexOf("/") + 1);
return null;
}
- public String getJobModelVersionPath() {
+ String getJobModelVersionPath() {
return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
}
- public String getJobModelPathPrefix() {
+ String getJobModelPathPrefix() {
return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
}
- public String getJobModelPath(String jobModelVersion) {
+ String getJobModelPath(String jobModelVersion) {
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
- public String getJobModelVersionBarrierPrefix() {
- return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER);
+ String getJobModelVersionBarrierPrefix() {
+ return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER_PATH);
+ }
+
+ String getTaskLocalityPath() {
+ return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
new file mode 100644
index 0000000..4cfdc8d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.SamzaException;
+import org.I0Itec.zkclient.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link MetadataStore} interface where the
+ * metadata of the Samza job is stored in zookeeper.
+ */
+public class ZkMetadataStore implements MetadataStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZkMetadataStore.class);
+
+ private final ZkClient zkClient;
+ private final ZkConfig zkConfig;
+ private final String zkBaseDir;
+
+ public ZkMetadataStore(String zkBaseDir, Config config, MetricsRegistry metricsRegistry) {
+ this.zkConfig = new ZkConfig(config);
+ this.zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs(), new BytesPushThroughSerializer());
+ this.zkBaseDir = zkBaseDir;
+ zkClient.createPersistent(zkBaseDir, true);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init() {
+ zkClient.waitUntilConnected(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] get(byte[] key) {
+ return zkClient.readData(getZkPathForKey(key), true);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void put(byte[] key, byte[] value) {
+ String zkPath = getZkPathForKey(key);
+ zkClient.createPersistent(zkPath, true);
+ zkClient.writeData(zkPath, value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void delete(byte[] key) {
+ zkClient.delete(getZkPathForKey(key));
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws SamzaException if there're exceptions reading data from zookeeper.
+ */
+ @Override
+ public Map<byte[], byte[]> all() {
+ try {
+ List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
+ Map<byte[], byte[]> result = new HashMap<>();
+ for (String zkSubDir : zkSubDirectories) {
+ String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
+ byte[] value = zkClient.readData(completeZkPath, true);
+ if (value != null) {
+ result.put(completeZkPath.getBytes("UTF-8"), value);
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ String errorMsg = String.format("Error reading path: %s from zookeeper.", zkBaseDir);
+ LOG.error(errorMsg, e);
+ throw new SamzaException(errorMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() {
+ // No-op for zookeeper implementation.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ zkClient.close();
+ }
+
+ private String getZkPathForKey(byte[] key) {
+ return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
new file mode 100644
index 0000000..a9c979d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Builds the {@link ZkMetadataStore} based upon the provided {@link Config}
+ * and {@link MetricsRegistry}.
+ */
+public class ZkMetadataStoreFactory implements MetadataStoreFactory {
+
+ @Override
+ public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
+ return new ZkMetadataStore(namespace, config, metricsRegistry);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 68de4a6..7b64f5e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -129,12 +129,6 @@ object SamzaContainer extends Logging {
val containerName = "samza-container-%s" format containerId
val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
- var localityManager: LocalityManager = null
- if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
- val registryMap = new MetricsRegistryMap(containerName)
- localityManager = new LocalityManager(config, registryMap)
- }
-
val containerPID = ManagementFactory.getRuntimeMXBean().getName()
info("Setting up Samza container: %s" format containerName)
@@ -719,7 +713,6 @@ object SamzaContainer extends Logging {
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
offsetManager = offsetManager,
- localityManager = localityManager,
securityManager = securityManager,
metrics = samzaContainerMetrics,
reporters = reporters,
@@ -799,7 +792,7 @@ class SamzaContainer(
startDiagnostics
startAdmins
startOffsetManager
- startLocalityManager
+ storeContainerLocality
startStores
startTableManager
startDiskSpaceMonitor
@@ -841,7 +834,6 @@ class SamzaContainer(
shutdownDiskSpaceMonitor
shutdownHostStatisticsMonitor
shutdownProducers
- shutdownLocalityManager
shutdownOffsetManager
shutdownMetrics
shutdownSecurityManger
@@ -961,8 +953,10 @@ class SamzaContainer(
offsetManager.start
}
- def startLocalityManager {
- if(localityManager != null) {
+ def storeContainerLocality {
+ val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled
+ if (isHostAffinityEnabled) {
+ val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry)
val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
info("Registering %s with metadata store" format containerName)
try {
@@ -978,6 +972,9 @@ class SamzaContainer(
case unknownException: Throwable =>
warn("Received an exception when persisting locality info for container %s: " +
"%s" format (containerContext.id, unknownException.getMessage))
+ } finally {
+ info("Shutting down locality manager.")
+ localityManager.close()
}
}
}
@@ -1145,13 +1142,6 @@ class SamzaContainer(
taskInstances.values.foreach(_.shutdownTableManager)
}
- def shutdownLocalityManager {
- if(localityManager != null) {
- info("Shutting down locality manager.")
- localityManager.close()
- }
- }
-
def shutdownOffsetManager {
info("Shutting down offset manager.")
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index da2d984..0e48363 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -57,7 +57,7 @@ public class TestCoordinatorStreamStore {
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test"));
when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
coordinatorStreamStore = new CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap), new MetricsRegistryMap());
- coordinatorStreamStore.init(new MapConfig(), new MetricsRegistryMap());
+ coordinatorStreamStore.init();
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 8ddd688..d2175b2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -56,6 +56,6 @@ public class TestZkKeyBuilder {
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
String version = "2";
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
- Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
new file mode 100644
index 0000000..5930c65
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZkMetadataStore {
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static EmbeddedZookeeper zkServer;
+
+ private MetadataStore zkMetadataStore;
+
+ @BeforeClass
+ public static void beforeClass() {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ zkServer.teardown();
+ }
+
+ @Before
+ public void beforeTest() {
+ String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort());
+ Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString));
+ zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
+ }
+
+ @After
+ public void afterTest() {
+ zkMetadataStore.close();
+ }
+
+ @Test
+ public void testReadAfterWrite() throws Exception {
+ byte[] key = "test-key1".getBytes("UTF-8");
+ byte[] value = "test-value1".getBytes("UTF-8");
+ Assert.assertNull(zkMetadataStore.get(key));
+ zkMetadataStore.put(key, value);
+ Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
+ Assert.assertEquals(1, zkMetadataStore.all().size());
+ }
+
+ @Test
+ public void testReadAfterDelete() throws Exception {
+ byte[] key = "test-key1".getBytes("UTF-8");
+ byte[] value = "test-value1".getBytes("UTF-8");
+ Assert.assertNull(zkMetadataStore.get(key));
+ zkMetadataStore.put(key, value);
+ Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
+ zkMetadataStore.delete(key);
+ Assert.assertNull(zkMetadataStore.get(key));
+ Assert.assertEquals(0, zkMetadataStore.all().size());
+ }
+
+ @Test
+ public void testReadOfNonExistentKey() throws Exception {
+ Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8")));
+ Assert.assertEquals(0, zkMetadataStore.all().size());
+ }
+
+ @Test
+ public void testMultipleUpdatesForSameKey() throws Exception {
+ byte[] key = "test-key1".getBytes("UTF-8");
+ byte[] value = "test-value1".getBytes("UTF-8");
+ byte[] value1 = "test-value2".getBytes("UTF-8");
+ zkMetadataStore.put(key, value);
+ zkMetadataStore.put(key, value1);
+ Assert.assertTrue(Arrays.equals(value1, zkMetadataStore.get(key)));
+ Assert.assertEquals(1, zkMetadataStore.all().size());
+ }
+
+ @Test
+ public void testAllEntries() throws Exception {
+ byte[] key = "test-key1".getBytes("UTF-8");
+ byte[] key1 = "test-key2".getBytes("UTF-8");
+ byte[] key2 = "test-key3".getBytes("UTF-8");
+ byte[] value = "test-value1".getBytes("UTF-8");
+ byte[] value1 = "test-value2".getBytes("UTF-8");
+ byte[] value2 = "test-value3".getBytes("UTF-8");
+ zkMetadataStore.put(key, value);
+ zkMetadataStore.put(key1, value1);
+ zkMetadataStore.put(key2, value2);
+ ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+ Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
+ }
+}