You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/03/01 01:56:59 UTC
samza git commit: SAMZA-1103: ZkBarrier
Repository: samza
Updated Branches:
refs/heads/master c58d74b35 -> 4d7b3b353
SAMZA-1103: ZkBarrier
SAMZA-1103: Barrier for JobModel upgrades. When all the processors got notification about the new JobModel, only after that they can start using the new model.
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>
Reviewers: Fred Ji <fr...@yahoo.com>, Navina Ramesh <na...@apache.org>, Xiliu Liu <xi...@linkedin.com>
Closes #61 from sborya/ZkBarrier
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4d7b3b35
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4d7b3b35
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4d7b3b35
Branch: refs/heads/master
Commit: 4d7b3b3534ed804ad54227901bf3bbaff32814e1
Parents: c58d74b
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Feb 28 17:56:50 2017 -0800
Committer: navina <na...@apache.org>
Committed: Tue Feb 28 17:56:50 2017 -0800
----------------------------------------------------------------------
.../samza/zk/BarrierForVersionUpgrade.java | 46 +++++
.../samza/zk/ScheduleAfterDebounceTime.java | 8 +-
.../samza/zk/ZkBarrierForVersionUpgrade.java | 166 +++++++++++++++++++
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 4 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 11 +-
.../apache/samza/task/ReadableCoordinator.scala | 1 +
.../zk/TestZkBarrierForVersionUpgrade.java | 148 +++++++++++++++++
.../apache/samza/zk/TestZkLeaderElector.java | 11 +-
8 files changed, 379 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
new file mode 100644
index 0000000..b2d80d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
+ * JobModel.
+ */
+public interface BarrierForVersionUpgrade {
+ /**
+ * Barrier is usually started by the leader.
+ * @param version - for which the barrier is started.
+ * @param processorsNames - list of processors available at the time of the JobModel generation.
+ */
+ void startBarrier(String version, List<String> processorsNames);
+
+ /**
+ * Called by the processor.
+ * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
+ * joined.
+ * @param version of the jobModel this barrier is protecting.
+ * @param processorsName as it appears in the list of processors.
+ * @param callback will be invoked, when barrier is reached.
+ */
+ void waitForBarrier(String version, String processorsName, Runnable callback);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 0a4db6d..289d900 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -42,15 +42,17 @@ public class ScheduleAfterDebounceTime {
public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
+ // Names of actions.
+ // When the same action is scheduled it needs to cancel the previous one.
+ // To accomplish that we keep the previous future in a map, keyed by the action name.
+
+ // Here we predefine some actions which are used in the ZK based standalone app.
// Action name when the JobModel version changes
public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
// Action name when the Processor membership changes
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
- // Action name when the Processor Data changes
- public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
-
public static final int DEBOUNCE_TIME_MS = 2000;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..3ec87b0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
+ private final ZkUtils zkUtils;
+ private final ZkKeyBuilder keyBuilder;
+ private final static String BARRIER_DONE = "done";
+ private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+
+ private final ScheduleAfterDebounceTime debounceTimer;
+
+ private final String barrierPrefix;
+
+ public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
+ this.zkUtils = zkUtils;
+ keyBuilder = zkUtils.getKeyBuilder();
+
+ barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
+ this.debounceTimer = debounceTimer;
+ }
+
+ @Override
+ public void startBarrier(String version, List<String> processorsNames) {
+ String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+ zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+
+ // callback for when the barrier is reached
+ Runnable callback = new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+ zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+ }
+ };
+ // subscribe for processor's list changes
+ LOG.info("Subscribing for child changes at " + barrierProcessors);
+ zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
+ new ZkBarrierChangeHandler(callback, processorsNames));
+ }
+
+ @Override
+ public void waitForBarrier(String version, String processorsName, Runnable callback) {
+ // if participant makes this call it means it has already stopped the old container and got the new job model.
+ String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+ String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
+
+
+ // update the barrier for this processor
+ LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+ zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+
+ // now subscribe for the barrier
+ zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+ }
+
+ /**
+ * listener for the subscription.
+ */
+ class ZkBarrierChangeHandler implements IZkChildListener {
+ Runnable callback;
+ List<String> names;
+
+ public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
+ this.callback = callback;
+ this.names = names;
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+ // Find out the event & Log
+ boolean allIn = true;
+
+ if (currentChildren == null) {
+ LOG.info("Got handleChildChange with null currentChildren");
+ return;
+ }
+ // debug
+ StringBuilder sb = new StringBuilder();
+ for (String child : currentChildren) {
+ sb.append(child).append(",");
+ }
+ LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString());
+ sb = new StringBuilder();
+ for (String child : names) {
+ sb.append(child).append(",");
+ }
+ LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString());
+
+
+ // check if all the names are in
+ for (String n : names) {
+ if (!currentChildren.contains(n)) {
+ LOG.info("node " + n + " is still not in the list ");
+ allIn = false;
+ break;
+ }
+ }
+ if (allIn) {
+ LOG.info("ALl nodes reached the barrier");
+ callback.run(); // all the names have registered
+ }
+ }
+ }
+
+ class ZkBarrierReachedHandler implements IZkDataListener {
+ private final ScheduleAfterDebounceTime debounceTimer;
+ private final String barrierPathDone;
+ private final Runnable callback;
+
+ public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
+ this.barrierPathDone = barrierPathDone;
+ this.callback = callback;
+ this.debounceTimer = debounceTimer;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ String done = (String) data;
+ LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
+ if (done.equals(BARRIER_DONE)) {
+ zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+ debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
+ }
+ // we do not need to resubscribe because, ZkClient library does it for us.
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ LOG.warn("barrier done got deleted at " + dataPath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index d6cb9f3..0a8f37e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -88,5 +88,7 @@ public class ZkKeyBuilder {
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
-
+ public String getJobModelVersionBarrierPrefix() {
+ return String.format("/%s/versionBarriers", pathPrefix);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index b11e02f..320cd49 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,9 @@
package org.apache.samza.zk;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
@@ -27,10 +30,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
/**
* Util class to help manage Zk connection and ZkClient.
* It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -165,7 +164,7 @@ public class ZkUtils {
/**
* verify that given paths exist in ZK
- * @param paths
+ * @param paths - paths to verify or create
*/
public void makeSurePersistentPathsExists(String[] paths) {
for (String path : paths) {
@@ -177,7 +176,7 @@ public class ZkUtils {
/**
* subscribe to the changes in the list of processors in ZK
- * @param listener
+ * @param listener - will be called when a processor is added or removed.
*/
public void subscribeToProcessorChange(IZkChildListener listener) {
LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
index 6e1134d..6c7641b 100644
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -38,3 +38,4 @@ class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator {
def requestedShutdownOnConsensus = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.CURRENT_TASK
def requestedShutdownNow = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER
}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..92cb2c9
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.Assert;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestZkBarrierForVersionUpgrade {
+ private static EmbeddedZookeeper zkServer = null;
+ private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
+ private String testZkConnectionString = null;
+ private ZkUtils testZkUtils = null;
+ private static final int SESSION_TIMEOUT_MS = 20000;
+ private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+ }
+
+ @Before
+ public void testSetup() {
+ testZkConnectionString = "localhost:" + zkServer.getPort();
+ try {
+ testZkUtils = getZkUtilsWithNewClient();
+ } catch (Exception e) {
+ Assert.fail("Client connection setup failed. Aborting tests..");
+ }
+ }
+
+ @After
+ public void testTeardown() {
+ testZkUtils.deleteRoot();
+ testZkUtils.close();
+ testZkUtils = null;
+ }
+
+ @AfterClass
+ public static void teardown() {
+ zkServer.teardown();
+ }
+
+ @Test
+ public void testZkBarrierForVersionUpgrade() {
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+ String ver = "1";
+ List<String> processors = new ArrayList<String>();
+ processors.add("p1");
+ processors.add("p2");
+
+ class Status {
+ boolean p1 = false;
+ boolean p2 = false;
+ }
+ final Status s = new Status();
+
+ barrier.startBarrier(ver, processors);
+
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
+ @Override
+ public void run() {
+ s.p1 = true;
+ }
+ });
+
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
+ @Override
+ public void run() {
+ s.p2 = true;
+ }
+ });
+
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
+ }
+
+ @Test
+ public void testNegativeZkBarrierForVersionUpgrade() {
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+ String ver = "1";
+ List<String> processors = new ArrayList<String>();
+ processors.add("p1");
+ processors.add("p2");
+ processors.add("p3");
+
+ class Status {
+ boolean p1 = false;
+ boolean p2 = false;
+ boolean p3 = false;
+ }
+ final Status s = new Status();
+
+ barrier.startBarrier(ver, processors);
+
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
+ @Override
+ public void run() {
+ s.p1 = true;
+ }
+ });
+
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
+ @Override
+ public void run() {
+ s.p2 = true;
+ }
+ });
+
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+
+ }
+
+
+ private ZkUtils getZkUtilsWithNewClient() {
+ ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+ return new ZkUtils(
+ KEY_BUILDER,
+ ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+ CONNECTION_TIMEOUT_MS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 6342fde..bfda464 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -18,6 +18,11 @@
*/
package org.apache.samza.zk;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -30,12 +35,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;