You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/27 21:37:18 UTC
samza git commit: SAMZA-1210: Fixing merge issue and container id
generations.
Repository: samza
Updated Branches:
refs/heads/master a2220ed23 -> 05060edfe
SAMZA-1210: Fixing merge issue and container id generations.
This PR is just for fixing issues introduced by a merge and changing container id generation.
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #122 from sborya/mergeFixes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/05060edf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/05060edf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/05060edf
Branch: refs/heads/master
Commit: 05060edfe18a47075dfe80059b0e83102063f587
Parents: a2220ed
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Apr 27 14:37:08 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Thu Apr 27 14:37:08 2017 -0700
----------------------------------------------------------------------
.../apache/samza/config/ApplicationConfig.java | 8 +++
.../samza/runtime/LocalApplicationRunner.java | 3 +-
.../java/org/apache/samza/zk/ProcessorData.java | 54 +++++++++++++++++
.../org/apache/samza/zk/ZkControllerImpl.java | 14 +++--
.../org/apache/samza/zk/ZkJobCoordinator.java | 61 +++++++++++--------
.../samza/zk/ZkJobCoordinatorFactory.java | 17 +-----
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 16 +++--
.../org/apache/samza/zk/ZkLeaderElector.java | 4 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 62 ++++++++++++++++----
.../org/apache/samza/zk/TestZkKeyBuilder.java | 9 +--
.../apache/samza/zk/TestZkLeaderElector.java | 30 +++++-----
.../java/org/apache/samza/zk/TestZkUtils.java | 30 ++++++++--
12 files changed, 218 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 9eb4161..ea0f999 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -72,6 +72,14 @@ public class ApplicationConfig extends MapConfig {
return get(APP_CLASS, null);
}
+ /**
+ * returns full application id
+ * @return full app id
+ */
+ public String getGlobalAppId() {
+ return String.format("app-%s-%s", getAppName(), getAppId());
+ }
+
@Deprecated
public String getProcessorId() {
return get(PROCESSOR_ID, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index adc76d5..5e83c3c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -161,8 +161,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
String clazz = appConfig.getCoordinationServiceFactoryClass();
if (clazz != null) {
CoordinationServiceFactory factory = ClassLoaderHelper.fromClassName(clazz);
- String groupId = String.format("app-%s-%s", appConfig.getAppName(), appConfig.getAppId());
- return factory.getCoordinationService(groupId, uid, config);
+ return factory.getCoordinationService(appConfig.getGlobalAppId(), uid, config);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
new file mode 100644
index 0000000..3f4fd0b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.apache.samza.SamzaException;
+
+
+public class ProcessorData {
+ private final String processorId;
+ private final String host;
+
+ public ProcessorData(String host, String processorId) {
+ this.processorId = processorId;
+ this.host = host;
+ }
+
+ public ProcessorData(String data) {
+ String[] splt = data.split(" ");
+ if (splt.length != 2) {
+ throw new SamzaException("incorrect processor data format = " + data);
+ }
+ host = splt[0];
+ processorId = splt[1];
+ }
+
+ public String toString() {
+ return host + " " + processorId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getProcessorId() {
+ return processorId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 4570a62..61f7876 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -52,8 +52,10 @@ public class ZkControllerImpl implements ZkController {
private void init() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.makeSurePersistentPathsExists(
- new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
- .getJobModelPathPrefix()});
+ new String[]{
+ keyBuilder.getProcessorsPath(),
+ keyBuilder.getJobModelVersionPath(),
+ keyBuilder.getJobModelPathPrefix()});
}
private void onBecomeLeader() {
@@ -114,16 +116,16 @@ public class ZkControllerImpl implements ZkController {
* Called when the children of the given path changed.
*
* @param parentPath The parent path
- * @param currentChilds The children or null if the root node (parent path) was deleted.
+ * @param currentChildren The children or null if the root node (parent path) was deleted.
* @throws Exception
*/
@Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
LOG.info(
"ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: "
- + currentChilds);
+ + currentChildren);
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
- ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
+ ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChildren));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 884c471..2535654 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
@@ -47,7 +49,7 @@ import org.slf4j.LoggerFactory;
*/
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
- private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
+ private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
private final ZkUtils zkUtils;
private final String processorId;
@@ -61,23 +63,24 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private final CoordinationUtils coordinationUtils;
private JobModel newJobModel;
- private String newJobModelVersion; // version published in ZK (by the leader)
private JobModel jobModel;
- public ZkJobCoordinator(String processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+ public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer,
SamzaContainerController containerController) {
- this.processorId = processorId;
- this.zkUtils = zkUtils;
- this.keyBuilder = zkUtils.getKeyBuilder();
this.debounceTimer = debounceTimer;
this.containerController = containerController;
- this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
this.config = config;
+ this.processorId = processorId;
+
this.coordinationUtils = Util.
<CoordinationServiceFactory>getObj(
new JobCoordinatorConfig(config)
.getJobCoordinationServiceFactoryClassName())
- .getCoordinationService(groupId, String.valueOf(processorId), config);
+ .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
+
+ this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
+ this.keyBuilder = zkUtils.getKeyBuilder();
+ this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
streamMetadataCache = getStreamMetadataCache();
}
@@ -141,28 +144,27 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
@Override
- public void onProcessorChange(List<String> processorIds) {
- log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray()));
- generateNewJobModel();
+ public void onProcessorChange(List<String> processors) {
+ log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
+ // if list of processors is empty - it means we are called from 'onBecomeLeader'
+ generateNewJobModel(processors);
}
@Override
public void onNewJobModelAvailable(final String version) {
- newJobModelVersion = version;
log.info("pid=" + processorId + "new JobModel available");
// stop current work
containerController.stopContainer();
log.info("pid=" + processorId + "new JobModel available.Container stopped.");
// get the new job model
newJobModel = zkUtils.getJobModel(version);
- log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel);
- String currentPath = zkUtils.getEphemeralPath();
- String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
+ log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
// update ZK and wait for all the processors to get this new version
- ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
- barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
+ ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(
+ JOB_MODEL_UPGRADE_BARRIER);
+ barrier.waitForBarrier(version, processorId, new Runnable() {
@Override
public void run() {
onNewJobModelConfirmed(version);
@@ -185,9 +187,16 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
- private void generateNewJobModel() {
- // get the current list of processors
- List<String> currentProcessors = zkUtils.getSortedActiveProcessors();
+ private void generateNewJobModel(List<String> processors) {
+ List<String> currentProcessorsIds;
+ if (processors.size() > 0) {
+ // we should use this list
+ // but it needs to be converted into PIDs, which is part of the data
+ currentProcessorsIds = zkUtils.getActiveProcessorsIDs(processors);
+ } else {
+ // get the current list of processors
+ currentProcessorsIds = zkUtils.getSortedActiveProcessorsIDs();
+ }
// get the current version
String currentJMVersion = zkUtils.getJobModelVersion();
@@ -200,10 +209,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
- List<String> containerIds = new ArrayList<>();
- for (String processor : currentProcessors) {
- String zkProcessorId = ZkKeyBuilder.parseIdFromPath(processor);
- containerIds.add(zkProcessorId);
+ List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
+ for (String processorPid : currentProcessorsIds) {
+ containerIds.add(processorPid);
}
log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
@@ -217,8 +225,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
// start the barrier for the job model update
- ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
- barrier.start(nextJMVersion, currentProcessors);
+ BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
+ JOB_MODEL_UPGRADE_BARRIER);
+ barrier.start(nextJMVersion, currentProcessorsIds);
// publish new JobModel version
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index fb63236..a44565c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,10 +19,7 @@
package org.apache.samza.zk;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.processor.SamzaContainerController;
@@ -31,27 +28,19 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
/**
* Method to instantiate an implementation of JobCoordinator
*
- * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+ * @param processorId - id of this processor
+ * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+ * @param containerController - controller to allow JobCoordinator control the SamzaContainer.
* @return An instance of IJobCoordinator
*/
@Override
public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) {
- JobConfig jobConfig = new JobConfig(config);
- String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
- ZkConfig zkConfig = new ZkConfig(config);
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
- ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
return new ZkJobCoordinator(
processorId,
- groupName,
config,
debounceTimer,
- new ZkUtils(
- new ZkKeyBuilder(groupName),
- zkClient,
- zkConfig.getZkConnectionTimeoutMs()
- ),
containerController);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 44f83e4..7452a97 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -26,7 +26,13 @@ import com.google.common.base.Strings;
* The following ZK hierarchy is maintained for Standalone jobs:
* <pre>
* - /
- * |- jobName-jobId/
+ * |- groupId/
+ * |- JobModelGeneration/
+ * |- jobModelVersion (data contains the version)
+ * |- jobModelUpgradeBarrier/ (contains barrier related data)
+ * |- jobModels/
+ * |- 1 (contains job model version 1 as data)
+ * |- 2
* |- processors/
* |- 00000001
* |- 00000002
@@ -44,7 +50,7 @@ public class ZkKeyBuilder {
private final String pathPrefix;
static final String PROCESSORS_PATH = "processors";
- public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
+ static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
public ZkKeyBuilder(String pathPrefix) {
if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -77,11 +83,11 @@ public class ZkKeyBuilder {
}
public String getJobModelVersionPath() {
- return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+ return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
}
public String getJobModelPathPrefix() {
- return String.format("/%s/jobModels", pathPrefix);
+ return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH, pathPrefix);
}
public String getJobModelPath(String jobModelVersion) {
@@ -89,6 +95,6 @@ public class ZkKeyBuilder {
}
public String getJobModelVersionBarrierPrefix(String barrierId) {
- return String.format("/%s/%s/versionBarriers", pathPrefix, barrierId);
+ return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, barrierId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index c2a8901..644864a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -83,9 +83,9 @@ public class ZkLeaderElector implements LeaderElector {
@Override
public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) {
- String currentPath = zkUtils.registerProcessorAndGetId(hostName + " " + processorIdStr);
+ String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
- List<String> children = zkUtils.getSortedActiveProcessors();
+ List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index d77aab2..fee8405 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -20,6 +20,7 @@
package org.apache.samza.zk;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -103,12 +104,11 @@ public class ZkUtils {
* @param data Object that should be written as data in the registered ephemeral ZK node
* @return String representing the absolute ephemeralPath of this client in the current session
*/
- public synchronized String registerProcessorAndGetId(final Object data) {
+ public synchronized String registerProcessorAndGetId(final ProcessorData data) {
if (ephemeralPath == null) {
- // TODO: Data should be more than just the hostname. Use Json serialized data
ephemeralPath =
zkClient.createEphemeralSequential(
- keyBuilder.getProcessorsPath() + "/", data);
+ keyBuilder.getProcessorsPath() + "/", data.toString());
LOG.info("newly generated path for " + data + " is " + ephemeralPath);
return ephemeralPath;
@@ -123,17 +123,59 @@ public class ZkUtils {
}
/**
- * Method is used to get the <i>sorted</i> list of currently active/registered processors
+ * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
*
* @return List of absolute ZK node paths
*/
- public List<String> getSortedActiveProcessors() {
- List<String> children = zkClient.getChildren(keyBuilder.getProcessorsPath());
- if (children.size() > 0) {
- Collections.sort(children);
- LOG.info("Found these children - " + children);
+ public List<String> getSortedActiveProcessorsZnodes() {
+ List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
+ if (znodeIds.size() > 0) {
+ Collections.sort(znodeIds);
+ LOG.info("Found these children - " + znodeIds);
}
- return children;
+ return znodeIds;
+ }
+
+ /**
+ * Method is used to read processor's data from the znode
+ * @param fullPath absolute path to the znode
+ * @return processor's data
+ */
+ String readProcessorData(String fullPath) {
+ String data = zkClient.<String>readData(fullPath, true);
+ if (data == null) {
+ throw new SamzaException(String.format("Cannot read ZK node:", fullPath));
+ }
+ return data;
+ }
+
+ /**
+ * Method is used to get the list of currently active/registered processor ids
+ * @return List of processorIds
+ */
+ public List<String> getSortedActiveProcessorsIDs() {
+ return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
+ }
+
+ /**
+ * Method is used to get the <i>sorted</i> list of processors ids for a given list of znodes
+ * @param znodeIds - list of relative paths of the children's znodes
+ * @return List of processor ids for a given list of znodes
+ */
+ public List<String> getActiveProcessorsIDs(List<String> znodeIds) {
+ String processorPath = keyBuilder.getProcessorsPath();
+ List<String> processorIds = new ArrayList<>(znodeIds.size());
+ if (znodeIds.size() > 0) {
+
+ for (String child : znodeIds) {
+ String fullPath = String.format("%s/%s", processorPath, child);
+ processorIds.add(readProcessorData(fullPath));
+ }
+
+ LOG.info("Found these children - " + znodeIds);
+ LOG.info("Found these processorIds - " + processorIds);
+ }
+ return processorIds;
}
/* Wrapper for standard I0Itec methods */
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index ef271f0..3134cfa 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -56,10 +56,11 @@ public class TestZkKeyBuilder {
ZkKeyBuilder builder = new ZkKeyBuilder("test");
- Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
- Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModelVersion", builder.getJobModelVersionPath());
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
String version = "2";
- Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
- Assert.assertEquals("/test/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix("testBarrier"));
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix(
+ "testBarrier"));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index c2a5a71..5aaee2a 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -100,7 +100,7 @@ public class TestZkLeaderElector {
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.registerProcessorAndGetId(any())).
thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
- when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+ when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
@@ -123,7 +123,7 @@ public class TestZkLeaderElector {
public void testUnregisteredProcessorInLeaderElection() {
String processorId = "1";
ZkUtils mockZkUtils = mock(ZkUtils.class);
- when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
+ when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
@@ -167,7 +167,7 @@ public class TestZkLeaderElector {
ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
- Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
+ Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
@Override
@@ -192,14 +192,14 @@ public class TestZkLeaderElector {
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
- Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
+ Assert.assertEquals(3, testZkUtils.getSortedActiveProcessorsZnodes().size());
// Clean up
zkUtils1.close();
zkUtils2.close();
zkUtils3.close();
- Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessors());
+ Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessorsZnodes());
}
@@ -223,7 +223,7 @@ public class TestZkLeaderElector {
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
- zkUtils1.registerProcessorAndGetId("processor1");
+ zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -241,7 +241,7 @@ public class TestZkLeaderElector {
// Processor-2
ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
- final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
+ final String path2 = zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -273,7 +273,7 @@ public class TestZkLeaderElector {
// Processor-3
ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3");
- zkUtils3.registerProcessorAndGetId("processor3");
+ zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -317,7 +317,7 @@ public class TestZkLeaderElector {
Assert.assertFalse(leaderElector2.amILeader());
Assert.assertFalse(leaderElector3.amILeader());
- List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
+ List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
Assert.assertEquals(3, currentActiveProcessors.size());
// Leader Failure
@@ -331,7 +331,7 @@ public class TestZkLeaderElector {
}
Assert.assertEquals(1, count.get());
- Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessors());
+ Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessorsZnodes());
// Clean up
zkUtils2.close();
@@ -358,7 +358,7 @@ public class TestZkLeaderElector {
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
- zkUtils1.registerProcessorAndGetId("processor1");
+ zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -378,7 +378,7 @@ public class TestZkLeaderElector {
// Processor-2
ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
- zkUtils2.registerProcessorAndGetId("processor2");
+ zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -397,7 +397,7 @@ public class TestZkLeaderElector {
// Processor-3
ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3");
- final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
+ final String path3 = zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -451,7 +451,7 @@ public class TestZkLeaderElector {
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
- List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
+ List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
Assert.assertEquals(3, currentActiveProcessors.size());
zkUtils2.close();
@@ -464,7 +464,7 @@ public class TestZkLeaderElector {
}
Assert.assertEquals(1, count.get());
- Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessors());
+ Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessorsZnodes());
// Clean up
zkUtils1.close();
http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 010fc19..b8dc295 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.zk;
+import java.util.List;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
@@ -73,7 +74,6 @@ public class TestZkUtils {
SESSION_TIMEOUT_MS);
zkUtils.connect();
-
}
@After
@@ -89,19 +89,37 @@ public class TestZkUtils {
@Test
public void testRegisterProcessorId() {
- String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0");
+ String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
// Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
- Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+ Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
}
@Test
public void testGetActiveProcessors() {
- Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
- zkUtils.registerProcessorAndGetId("processorData");
- Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
+ Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
+ zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
+ Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
+ }
+
+ @Test
+ public void testGetProcessorsIDs() {
+ Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
+ zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
+ List<String> l = zkUtils.getSortedActiveProcessorsIDs();
+ Assert.assertEquals(1, l.size());
+ new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+ l = zkUtils.getSortedActiveProcessorsIDs();
+ Assert.assertEquals(2, l.size());
+
+ ProcessorData pd = new ProcessorData(l.get(0));
+ Assert.assertEquals(" ID1 didn't match", "1", pd.getProcessorId());
+ Assert.assertEquals(" Host1 didn't match", "host1", pd.getHost());
+ pd = new ProcessorData(l.get(1));
+ Assert.assertEquals(" ID2 didn't match", "2", pd.getProcessorId());
+ Assert.assertEquals(" Host2 didn't match", "host2", pd.getHost());
}
@Test