You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:51 UTC
[2/9] samza git commit: SAMZA-1102: Zk controller
SAMZA-1102: Zk controller
SAMZA-1102: Added ZKController and ZkControllerImpl
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: navina <na...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@apache.org>, Xinyu Liu <xi...@linkedin.com>
Closes #50 from sborya/ZkController
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f1bc1d0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f1bc1d0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f1bc1d0b
Branch: refs/heads/samza-fluent-api-v1
Commit: f1bc1d0b36242170930c0001c9efa7e5c24f8dd0
Parents: e6147fd
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Feb 23 14:02:05 2017 -0800
Committer: navina <na...@apache.org>
Committed: Thu Feb 23 14:02:05 2017 -0800
----------------------------------------------------------------------
.../processor/SamzaContainerController.java | 1 +
.../apache/samza/processor/StreamProcessor.java | 10 +-
.../java/org/apache/samza/zk/ZkController.java | 32 ++++
.../org/apache/samza/zk/ZkControllerImpl.java | 163 +++++++++++++++++++
.../apache/samza/zk/ZkControllerListener.java | 34 ++++
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 22 ++-
.../org/apache/samza/zk/ZkLeaderElector.java | 36 ++--
.../main/java/org/apache/samza/zk/ZkUtils.java | 49 ++++++
.../org/apache/samza/zk/TestZkKeyBuilder.java | 4 +-
.../apache/samza/zk/TestZkLeaderElector.java | 152 ++++++++++++++---
.../java/org/apache/samza/zk/TestZkUtils.java | 105 ++++++++++--
11 files changed, 549 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index d448d30..76e2053 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -60,6 +60,7 @@ public class SamzaContainerController {
* @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask}
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
+ * @param processorId Id of the processor
* @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
*/
public SamzaContainerController(
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5e90c56..4d3e8ab 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -94,9 +94,14 @@ public class StreamProcessor {
this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
}
+
/**
- * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+ *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
* using the provided {@link StreamTaskFactory}.
+ * @param processorId - this processor Id
+ * @param config - config
+ * @param customMetricsReporters metric Reporter
+ * @param streamTaskFactory task factory to instantiate the Task
*/
public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory) {
@@ -106,6 +111,9 @@ public class StreamProcessor {
/**
* Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
* using the "task.class" configuration instead of a task factory.
+ * @param processorId - this processor Id
+ * @param config - config
+ * @param customMetricsReporters metrics
*/
public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) {
this(processorId, config, customMetricsReporters, (Object) null);
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
new file mode 100644
index 0000000..20c62cf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.samza.zk;
+
+/**
+ * Api to the functionality provided by ZK
+ */
+public interface ZkController {
+ void register();
+ boolean isLeader();
+ void notifyJobModelChange(String version);
+ void stop();
+ void listenToProcessorLiveness();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
new file mode 100644
index 0000000..70c8a37
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ZkControllerImpl implements ZkController {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
+
+ private final String processorIdStr;
+ private final ZkUtils zkUtils;
+ private final ZkControllerListener zkControllerListener;
+ private final ZkLeaderElector leaderElector;
+ private final ScheduleAfterDebounceTime debounceTimer;
+
+ public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer,
+ ZkControllerListener zkControllerListener) {
+ this.processorIdStr = processorIdStr;
+ this.zkUtils = zkUtils;
+ this.zkControllerListener = zkControllerListener;
+ this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ onBecomeLeader();
+ }
+ }
+ );
+ this.debounceTimer = debounceTimer;
+
+ init();
+ }
+
+ private void init() {
+ ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
+ zkUtils.makeSurePersistentPathsExists(
+ new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+ .getJobModelPathPrefix()});
+ }
+
+ private void onBecomeLeader() {
+
+ listenToProcessorLiveness(); // subscribe for adding new processors
+
+ // inform the caller
+ zkControllerListener.onBecomeLeader();
+
+ }
+
+ @Override
+ public void register() {
+
+ // TODO - make a loop here with some number of attempts.
+ // possibly split into two method - becomeLeader() and becomeParticipant()
+ leaderElector.tryBecomeLeader();
+
+ // subscribe to JobModel version updates
+ zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
+ }
+
+ @Override
+ public boolean isLeader() {
+ return leaderElector.amILeader();
+ }
+
+ @Override
+ public void notifyJobModelChange(String version) {
+ zkControllerListener.onNewJobModelAvailable(version);
+ }
+
+ @Override
+ public void stop() {
+ if (isLeader()) {
+ leaderElector.resignLeadership();
+ }
+ zkUtils.close();
+ }
+
+ @Override
+ public void listenToProcessorLiveness() {
+ zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer));
+ }
+
+ // Only by Leader
+ class ZkProcessorChangeHandler implements IZkChildListener {
+ private final ScheduleAfterDebounceTime debounceTimer;
+ public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+ this.debounceTimer = debounceTimer;
+ }
+ /**
+ * Called when the children of the given path changed.
+ *
+ * @param parentPath The parent path
+ * @param currentChilds The children or null if the root node (parent path) was deleted.
+ * @throws Exception
+ */
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ LOG.info(
+ "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: "
+ + currentChilds);
+ debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+ ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
+ }
+ }
+
+ class ZkJobModelVersionChangeHandler implements IZkDataListener {
+ private final ScheduleAfterDebounceTime debounceTimer;
+ public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+ this.debounceTimer = debounceTimer;
+ }
+ /**
+ * called when job model version gets updated
+ * @param dataPath
+ * @param data
+ * @throws Exception
+ */
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data="
+ + (String) data);
+
+ debounceTimer
+ .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
+ }
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ throw new SamzaException("version update path has been deleted!");
+ }
+ }
+
+ public void shutdown() {
+ if (debounceTimer != null)
+ debounceTimer.stopScheduler();
+
+ if (zkUtils != null)
+ zkUtils.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
new file mode 100644
index 0000000..f7fedd7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * callbacks to the caller of the ZkController
+ */
+public interface ZkControllerListener {
+ void onBecomeLeader();
+ void onProcessorChange(List<String> processorIds);
+
+ void onNewJobModelAvailable(String version); // start job model update (stop current work)
+ void onNewJobModelConfirmed(String version); // start new work according to the new model
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 28344e9..d6cb9f3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -19,8 +19,8 @@
package org.apache.samza.zk;
-import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
+import com.google.common.base.Strings;
/**
* The following ZK hierarchy is maintained for Standalone jobs:
@@ -44,7 +44,7 @@ public class ZkKeyBuilder {
private final String pathPrefix;
static final String PROCESSORS_PATH = "processors";
- static final String PROCESSOR_ID_PREFIX = "processor-";
+ public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
public ZkKeyBuilder(String pathPrefix) {
if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -53,6 +53,10 @@ public class ZkKeyBuilder {
this.pathPrefix = pathPrefix.trim();
}
+ public String getRootPath() {
+ return "/" + pathPrefix;
+ }
+
public String getProcessorsPath() {
return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
}
@@ -71,4 +75,18 @@ public class ZkKeyBuilder {
return path.substring(path.lastIndexOf("/") + 1);
return null;
}
+
+ public String getJobModelVersionPath() {
+ return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+ }
+
+ public String getJobModelPathPrefix() {
+ return String.format("/%s/jobModels", pathPrefix);
+ }
+
+ public String getJobModelPath(String jobModelVersion) {
+ return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8cdf8fc..b9bdf11 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -50,25 +50,30 @@ public class ZkLeaderElector implements LeaderElector {
private final String hostName;
private AtomicBoolean isLeader = new AtomicBoolean(false);
- private final IZkDataListener zkLeaderElectionListener;
+ private final IZkDataListener previousProcessorChangeListener;
+ ZkLeaderElectorListener zkLeaderElectorListener;
private String currentSubscription = null;
private final Random random = new Random();
@VisibleForTesting
- ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) {
+ ZkLeaderElector(String processorIdStr,
+ ZkUtils zkUtils,
+ ZkLeaderElectorListener zkLeaderElectorListener,
+ IZkDataListener previousProcessorChangeListener) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
- this.zkLeaderElectionListener = leaderElectionListener;
this.keyBuilder = this.zkUtils.getKeyBuilder();
this.hostName = getHostName();
+ this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
+ if (previousProcessorChangeListener == null)
+ this.previousProcessorChangeListener = new PreviousProcessorChangeListener();
+ else
+ this.previousProcessorChangeListener = previousProcessorChangeListener;
}
- public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
- this.zkLeaderElectionListener = new ZkLeaderElectionListener();
- this.processorIdStr = processorIdStr;
- this.zkUtils = zkUtils;
- this.keyBuilder = this.zkUtils.getKeyBuilder();
- this.hostName = getHostName();
+ public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
+ this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
+
}
// TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,6 +86,10 @@ public class ZkLeaderElector implements LeaderElector {
}
}
+ public interface ZkLeaderElectorListener {
+ void onBecomingLeader();
+ }
+
@Override
public boolean tryBecomeLeader() {
String currentPath = zkUtils.registerProcessorAndGetId(hostName);
@@ -96,6 +105,7 @@ public class ZkLeaderElector implements LeaderElector {
if (index == 0) {
isLeader.getAndSet(true);
LOGGER.info(zLog("Eligible to become the leader!"));
+ zkLeaderElectorListener.onBecomingLeader(); // inform the caller
return true;
}
@@ -105,11 +115,13 @@ public class ZkLeaderElector implements LeaderElector {
if (!predecessor.equals(currentSubscription)) {
if (currentSubscription != null) {
LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
- zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+ zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+ previousProcessorChangeListener);
}
currentSubscription = predecessor;
LOGGER.info(zLog("Subscribing data change for " + predecessor));
- zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+ zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+ previousProcessorChangeListener);
}
/**
* Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
@@ -146,7 +158,7 @@ public class ZkLeaderElector implements LeaderElector {
}
// Only by non-leaders
- class ZkLeaderElectionListener implements IZkDataListener {
+ class PreviousProcessorChangeListener implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index d0a269d..b11e02f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
package org.apache.samza.zk;
+import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
@@ -58,6 +59,7 @@ public class ZkUtils {
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
+ private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO
public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
this.keyBuilder = zkKeyBuilder;
@@ -143,4 +145,51 @@ public class ZkUtils {
public void close() throws ZkInterruptedException {
zkClient.close();
}
+
+ /**
+ * subscribe for changes of JobModel version
+ * @param dataListener describe this
+ */
+ public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
+ LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
+ zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+ }
+
+ /**
+ * read the jobmodel version from ZK
+ * @return jobmodel version as a string
+ */
+ public String getJobModelVersion() {
+ return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+ }
+
+ /**
+ * verify that given paths exist in ZK
+ * @param paths
+ */
+ public void makeSurePersistentPathsExists(String[] paths) {
+ for (String path : paths) {
+ if (!zkClient.exists(path)) {
+ zkClient.createPersistent(path, true);
+ }
+ }
+ }
+
+ /**
+ * subscribe to the changes in the list of processors in ZK
+ * @param listener
+ */
+ public void subscribeToProcessorChange(IZkChildListener listener) {
+ LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
+ zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+ }
+
+ public void deleteRoot() {
+ String rootPath = keyBuilder.getRootPath();
+ if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+ LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+ zkClient.deleteRecursive(rootPath);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index e04f7c9..8e048b2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -45,8 +45,8 @@ public class TestZkKeyBuilder {
@Test
public void testParseIdFromPath() {
Assert.assertEquals(
- ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1",
- ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1"));
+ "1",
+ ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1"));
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b999ec5..6342fde 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -70,9 +70,13 @@ public class TestZkLeaderElector {
}
}
+ public static class BooleanResult {
+ public boolean res = false;
+ }
@After
public void testTeardown() {
+ testZkUtils.deleteRoot();
testZkUtils.close();
}
@@ -94,8 +98,17 @@ public class TestZkLeaderElector {
thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
- ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils);
- Assert.assertTrue(leaderElector.tryBecomeLeader());
+ BooleanResult isLeader = new BooleanResult();
+ ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader.res = true;
+ }
+ }
+ );
+ leaderElector.tryBecomeLeader();
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100));
}
@Test
@@ -104,7 +117,13 @@ public class TestZkLeaderElector {
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
- ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils);
+ ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ }
+ }
+ );
try {
leaderElector.tryBecomeLeader();
Assert.fail("Was expecting leader election to fail!");
@@ -118,29 +137,50 @@ public class TestZkLeaderElector {
*/
@Test
public void testLeaderElection() {
+ BooleanResult isLeader1 = new BooleanResult();
+ BooleanResult isLeader2 = new BooleanResult();
+ BooleanResult isLeader3 = new BooleanResult();
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector1 = new ZkLeaderElector(
- "1",
- zkUtils1);
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ }
+ );
// Processor-2
ZkUtils zkUtils2 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector2 = new ZkLeaderElector(
- "2",
- zkUtils2);
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ }
+ );
// Processor-3
ZkUtils zkUtils3 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector3 = new ZkLeaderElector(
- "3",
- zkUtils3);
+ ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ });
Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
- Assert.assertTrue(leaderElector1.tryBecomeLeader());
- Assert.assertFalse(leaderElector2.tryBecomeLeader());
- Assert.assertFalse(leaderElector3.tryBecomeLeader());
+ leaderElector1.tryBecomeLeader();
+ leaderElector2.tryBecomeLeader();
+ leaderElector3.tryBecomeLeader();
+
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
@@ -166,16 +206,26 @@ public class TestZkLeaderElector {
final CountDownLatch electionLatch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger(0);
+ BooleanResult isLeader1 = new BooleanResult();
+ BooleanResult isLeader2 = new BooleanResult();
+ BooleanResult isLeader3 = new BooleanResult();
+
+
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient();
zkUtils1.registerProcessorAndGetId("processor1");
ZkLeaderElector leaderElector1 = new ZkLeaderElector(
"1",
zkUtils1,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
-
}
@Override
@@ -191,6 +241,12 @@ public class TestZkLeaderElector {
ZkLeaderElector leaderElector2 = new ZkLeaderElector(
"2",
zkUtils2,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -223,6 +279,12 @@ public class TestZkLeaderElector {
ZkLeaderElector leaderElector3 = new ZkLeaderElector(
"3",
zkUtils3,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -236,9 +298,12 @@ public class TestZkLeaderElector {
});
// Join Leader Election
- Assert.assertTrue(leaderElector1.tryBecomeLeader());
- Assert.assertFalse(leaderElector2.tryBecomeLeader());
- Assert.assertFalse(leaderElector3.tryBecomeLeader());
+ leaderElector1.tryBecomeLeader();
+ leaderElector2.tryBecomeLeader();
+ leaderElector3.tryBecomeLeader();
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
Assert.assertTrue(leaderElector1.amILeader());
Assert.assertFalse(leaderElector2.amILeader());
@@ -278,12 +343,22 @@ public class TestZkLeaderElector {
final CountDownLatch electionLatch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger(0);
+ BooleanResult isLeader1 = new BooleanResult();
+ BooleanResult isLeader2 = new BooleanResult();
+ BooleanResult isLeader3 = new BooleanResult();
+
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient();
zkUtils1.registerProcessorAndGetId("processor1");
ZkLeaderElector leaderElector1 = new ZkLeaderElector(
"1",
zkUtils1,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -302,6 +377,12 @@ public class TestZkLeaderElector {
ZkLeaderElector leaderElector2 = new ZkLeaderElector(
"2",
zkUtils2,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -320,6 +401,12 @@ public class TestZkLeaderElector {
ZkLeaderElector leaderElector3 = new ZkLeaderElector(
"3",
zkUtils3,
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ },
new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -347,9 +434,12 @@ public class TestZkLeaderElector {
});
// Join Leader Election
- Assert.assertTrue(leaderElector1.tryBecomeLeader());
- Assert.assertFalse(leaderElector2.tryBecomeLeader());
- Assert.assertFalse(leaderElector3.tryBecomeLeader());
+ leaderElector1.tryBecomeLeader();
+ leaderElector2.tryBecomeLeader();
+ leaderElector3.tryBecomeLeader();
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
Assert.assertEquals(3, currentActiveProcessors.size());
@@ -373,15 +463,29 @@ public class TestZkLeaderElector {
@Test
public void testAmILeader() {
+ BooleanResult isLeader1 = new BooleanResult();
+ BooleanResult isLeader2 = new BooleanResult();
// Processor-1
ZkLeaderElector leaderElector1 = new ZkLeaderElector(
"1",
- getZkUtilsWithNewClient());
+ getZkUtilsWithNewClient(),
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ });
// Processor-2
ZkLeaderElector leaderElector2 = new ZkLeaderElector(
"2",
- getZkUtilsWithNewClient());
+ getZkUtilsWithNewClient(),
+ new ZkLeaderElector.ZkLeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ });
// Before Leader Election
Assert.assertFalse(leaderElector1.amILeader());
http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 855d29d..b719e28 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.samza.zk;
+import java.util.function.BooleanSupplier;
+import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -32,10 +34,10 @@ import org.junit.Test;
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
- private ZkConnection zkConnection = null;
private ZkClient zkClient = null;
private static final int SESSION_TIMEOUT_MS = 20000;
private static final int CONNECTION_TIMEOUT_MS = 10000;
+ private ZkUtils zkUtils;
@BeforeClass
public static void setup() throws InterruptedException {
@@ -57,11 +59,21 @@ public class TestZkUtils {
} catch (ZkNodeExistsException e) {
// Do nothing
}
+
+
+ zkUtils = new ZkUtils(
+ KEY_BUILDER,
+ zkClient,
+ SESSION_TIMEOUT_MS);
+
+ zkUtils.connect();
+
}
@After
public void testTeardown() {
+ zkUtils.close();
zkClient.close();
}
@@ -72,34 +84,91 @@ public class TestZkUtils {
@Test
public void testRegisterProcessorId() {
- ZkUtils utils = new ZkUtils(
- KEY_BUILDER,
- zkClient,
- SESSION_TIMEOUT_MS);
- utils.connect();
- String assignedPath = utils.registerProcessorAndGetId("0.0.0.0");
+ String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0");
Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
// Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
- Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+ Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
- utils.close();
}
@Test
public void testGetActiveProcessors() {
- ZkUtils utils = new ZkUtils(
- KEY_BUILDER,
- zkClient,
- SESSION_TIMEOUT_MS);
- utils.connect();
+ Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
+ zkUtils.registerProcessorAndGetId("processorData");
- Assert.assertEquals(0, utils.getSortedActiveProcessors().size());
- utils.registerProcessorAndGetId("processorData");
+ Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
- Assert.assertEquals(1, utils.getSortedActiveProcessors().size());
+ }
- utils.close();
+ @Test
+ public void testSubscribeToJobModelVersionChange() {
+
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+ String root = keyBuilder.getRootPath();
+ zkClient.deleteRecursive(root);
+
+ class Result {
+ String res = "";
+ public String getRes() {
+ return res;
+ }
+ public void updateRes(String newRes) {
+ res = newRes;
+ }
+ }
+
+ Assert.assertFalse(zkUtils.exists(root));
+
+ // create the paths
+ zkUtils.makeSurePersistentPathsExists(
+ new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
+ Assert.assertTrue(zkUtils.exists(root));
+ Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
+ Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
+
+ final Result res = new Result();
+ // define the callback
+ IZkDataListener dataListener = new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ res.updateRes((String) data);
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ Assert.fail("Data wasn't deleted;");
+ }
+ };
+ // subscribe
+ zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+ zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
+ // update
+ zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
+
+ // verify
+ Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
+
+ // update again
+ zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
+
+ Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
}
+ public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
+ long delay = startDelayMs;
+ while (delay < maxDelayMs) {
+ if (cond.getAsBoolean())
+ return true;
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ delay *= 2;
+ }
+ return false;
+ }
}