You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/03/01 01:56:59 UTC

samza git commit: SAMZA-1103: ZkBarrier

Repository: samza
Updated Branches:
  refs/heads/master c58d74b35 -> 4d7b3b353


SAMZA-1103: ZkBarrier

SAMZA-1103: Barrier for JobModel upgrades. When all the processors got notification about the new JobModel, only after that they can start using the new model.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>

Reviewers: Fred Ji <fr...@yahoo.com>, Navina Ramesh <na...@apache.org>, Xiliu Liu <xi...@linkedin.com>

Closes #61 from sborya/ZkBarrier


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4d7b3b35
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4d7b3b35
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4d7b3b35

Branch: refs/heads/master
Commit: 4d7b3b3534ed804ad54227901bf3bbaff32814e1
Parents: c58d74b
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Feb 28 17:56:50 2017 -0800
Committer: navina <na...@apache.org>
Committed: Tue Feb 28 17:56:50 2017 -0800

----------------------------------------------------------------------
 .../samza/zk/BarrierForVersionUpgrade.java      |  46 +++++
 .../samza/zk/ScheduleAfterDebounceTime.java     |   8 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 166 +++++++++++++++++++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |   4 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  11 +-
 .../apache/samza/task/ReadableCoordinator.scala |   1 +
 .../zk/TestZkBarrierForVersionUpgrade.java      | 148 +++++++++++++++++
 .../apache/samza/zk/TestZkLeaderElector.java    |  11 +-
 8 files changed, 379 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
new file mode 100644
index 0000000..b2d80d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
+ * JobModel.
+ */
+public interface BarrierForVersionUpgrade {
+  /**
+   * Barrier is usually started by the leader.
+   * @param version - for which the barrier is started.
+   * @param processorsNames - list of processors available at the time of the JobModel generation.
+   */
+  void startBarrier(String version,  List<String> processorsNames);
+
+  /**
+   * Called by the processor.
+   * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
+   * joined.
+   * @param version of the jobModel this barrier is protecting.
+   * @param processorsName as it appears in the list of processors.
+   * @param callback  will be invoked, when barrier is reached.
+   */
+  void waitForBarrier(String version, String processorsName, Runnable callback);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 0a4db6d..289d900 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -42,15 +42,17 @@ public class ScheduleAfterDebounceTime {
   public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
   public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
 
+  // Names of actions.
+  // When the same action is scheduled it needs to cancel the previous one.
+  // To accomplish that we keep the previous future in a map, keyed by the action name.
+
+  // Here we predefine some actions which are used in the ZK based standalone app.
   // Action name when the JobModel version changes
   public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
 
   // Action name when the Processor membership changes
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
 
-  // Action name when the Processor Data changes
-  public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
-
   public static final int DEBOUNCE_TIME_MS = 2000;
 
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..3ec87b0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
+  private final ZkUtils zkUtils;
+  private final ZkKeyBuilder keyBuilder;
+  private final static String BARRIER_DONE = "done";
+  private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  private final String barrierPrefix;
+
+  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
+    this.zkUtils = zkUtils;
+    keyBuilder = zkUtils.getKeyBuilder();
+
+    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
+    this.debounceTimer = debounceTimer;
+  }
+
+  @Override
+  public void startBarrier(String version, List<String> processorsNames) {
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+
+    // callback for when the barrier is reached
+    Runnable callback = new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+      }
+    };
+    // subscribe for processor's list changes
+    LOG.info("Subscribing for child changes at " + barrierProcessors);
+    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
+        new ZkBarrierChangeHandler(callback, processorsNames));
+  }
+
+  @Override
+  public void waitForBarrier(String version, String processorsName, Runnable callback) {
+    // if participant makes this call it means it has already stopped the old container and got the new job model.
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+    String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
+
+
+    // update the barrier for this processor
+    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+
+    // now subscribe for the barrier
+    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+  }
+
+  /**
+   * listener for the subscription.
+   */
+  class ZkBarrierChangeHandler implements IZkChildListener {
+    Runnable callback;
+    List<String> names;
+
+    public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
+      this.callback = callback;
+      this.names = names;
+    }
+
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+      // Find out the event & Log
+      boolean allIn = true;
+
+      if (currentChildren == null) {
+        LOG.info("Got handleChildChange with null currentChildren");
+        return;
+      }
+      // debug
+      StringBuilder sb = new StringBuilder();
+      for (String child : currentChildren) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString());
+      sb = new StringBuilder();
+      for (String child : names) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString());
+
+
+      // check if all the names are in
+      for (String n : names) {
+        if (!currentChildren.contains(n)) {
+          LOG.info("node " + n + " is still not in the list ");
+          allIn = false;
+          break;
+        }
+      }
+      if (allIn) {
+        LOG.info("ALl nodes reached the barrier");
+        callback.run(); // all the names have registered
+      }
+    }
+  }
+
+  class ZkBarrierReachedHandler implements IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    private final String barrierPathDone;
+    private final Runnable callback;
+
+    public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
+      this.barrierPathDone = barrierPathDone;
+      this.callback = callback;
+      this.debounceTimer = debounceTimer;
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data)
+        throws Exception {
+      String done = (String) data;
+      LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
+      if (done.equals(BARRIER_DONE)) {
+        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+        debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
+      }
+      // we do not need to resubscribe because, ZkClient library does it for us.
+
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath)
+        throws Exception {
+      LOG.warn("barrier done got deleted at " + dataPath);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index d6cb9f3..0a8f37e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -88,5 +88,7 @@ public class ZkKeyBuilder {
     return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
   }
 
-
+  public String getJobModelVersionBarrierPrefix() {
+    return String.format("/%s/versionBarriers", pathPrefix);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index b11e02f..320cd49 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.zk;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -27,10 +30,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -165,7 +164,7 @@ public class ZkUtils {
 
   /**
    * verify that given paths exist in ZK
-   * @param paths
+   * @param paths - paths to verify or create
    */
   public void makeSurePersistentPathsExists(String[] paths) {
     for (String path : paths) {
@@ -177,7 +176,7 @@ public class ZkUtils {
 
   /**
    * subscribe to the changes in the list of processors in ZK
-   * @param listener
+   * @param listener - will be called when a processor is added or removed.
    */
   public void subscribeToProcessorChange(IZkChildListener listener) {
     LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
index 6e1134d..6c7641b 100644
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -38,3 +38,4 @@ class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator {
   def requestedShutdownOnConsensus = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.CURRENT_TASK
   def requestedShutdownNow         = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER
 }
+

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..92cb2c9
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.Assert;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestZkBarrierForVersionUpgrade {
+  private static EmbeddedZookeeper zkServer = null;
+  private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
+  private String testZkConnectionString = null;
+  private ZkUtils testZkUtils = null;
+  private static final int SESSION_TIMEOUT_MS = 20000;
+  private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @Before
+  public void testSetup() {
+    testZkConnectionString = "localhost:" + zkServer.getPort();
+    try {
+      testZkUtils = getZkUtilsWithNewClient();
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed. Aborting tests..");
+    }
+  }
+
+  @After
+  public void testTeardown() {
+    testZkUtils.deleteRoot();
+    testZkUtils.close();
+    testZkUtils = null;
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testZkBarrierForVersionUpgrade() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+    String ver = "1";
+    List<String> processors = new ArrayList<String>();
+    processors.add("p1");
+    processors.add("p2");
+
+    class Status {
+      boolean p1 = false;
+      boolean p2 = false;
+    }
+    final Status s = new Status();
+
+    barrier.startBarrier(ver, processors);
+
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
+      @Override
+      public void run() {
+        s.p1 = true;
+      }
+    });
+
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
+      @Override
+      public void run() {
+        s.p2 = true;
+      }
+    });
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
+  }
+
+  @Test
+  public void testNegativeZkBarrierForVersionUpgrade() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+    String ver = "1";
+    List<String> processors = new ArrayList<String>();
+    processors.add("p1");
+    processors.add("p2");
+    processors.add("p3");
+
+    class Status {
+      boolean p1 = false;
+      boolean p2 = false;
+      boolean p3 = false;
+    }
+    final Status s = new Status();
+
+    barrier.startBarrier(ver, processors);
+
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
+      @Override
+      public void run() {
+        s.p1 = true;
+      }
+    });
+
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
+      @Override
+      public void run() {
+        s.p2 = true;
+      }
+    });
+
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+
+  }
+
+
+  private ZkUtils getZkUtilsWithNewClient() {
+    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    return new ZkUtils(
+        KEY_BUILDER,
+        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        CONNECTION_TIMEOUT_MS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 6342fde..bfda464 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -18,6 +18,11 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -30,12 +35,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;