You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/08/23 00:56:04 UTC

samza git commit: SAMZA-1382: added Zk communication protocol version verification

Repository: samza
Updated Branches:
  refs/heads/master 67f7214aa -> 80d82b6a5


SAMZA-1382: added Zk communication protocol version verification

Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #255 from sborya/ZkCommunicationVersion


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/80d82b6a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/80d82b6a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/80d82b6a

Branch: refs/heads/master
Commit: 80d82b6a51f6dc0b65f766b63704b30c780073ac
Parents: 67f7214
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Aug 22 17:55:54 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Aug 22 17:55:54 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |  2 +-
 .../org/apache/samza/zk/ZkControllerImpl.java   | 19 ++++++-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  2 +-
 .../org/apache/samza/zk/ZkProcessorLatch.java   |  4 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 41 +++++++++++++-
 .../apache/samza/zk/TestZkLeaderElector.java    |  4 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   | 57 ++++++++++++++++++--
 7 files changed, 115 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 3257ee1..abea299 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -83,7 +83,7 @@ public class ZkBarrierForVersionUpgrade {
   public void create(final String version, List<String> participants) {
     String barrierRoot = keyBuilder.getBarrierRoot();
     String barrierParticipantsPath = keyBuilder.getBarrierParticipantsPath(version);
-    zkUtils.makeSurePersistentPathsExists(new String[]{
+    zkUtils.validatePaths(new String[]{
         barrierRoot,
         keyBuilder.getBarrierPath(version),
         barrierParticipantsPath,

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 3af5042..6305616 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -46,8 +46,8 @@ public class ZkControllerImpl implements ZkController {
 
   private void init() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
-    zkUtils.makeSurePersistentPathsExists(
-        new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+
+    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
             .getJobModelPathPrefix()});
   }
 
@@ -57,6 +57,21 @@ public class ZkControllerImpl implements ZkController {
     // possibly split into two method - becomeLeader() and becomeParticipant()
     zkLeaderElector.tryBecomeLeader();
 
+    // make sure we are connection to a job that uses the same ZK communication protocol version.
+    try {
+      zkUtils.validateZkVersion();
+    } catch (SamzaException e) {
+      // IMPORTANT: Mismatch of the version, means we are trying to join a job, started by processors with different version.
+      // If there are no processors running, this is the place to do the migration to the new
+      // ZK structure.
+      // If some processors are running, then this processor should fail with an error to tell the user to stop all
+      // the processors before upgrading to this new version.
+      // TODO migration here
+      // for now we just rethrow the exception
+      throw e;
+    }
+
+
     // subscribe to JobModel version updates
     zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 97430cb..f4c1e94 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -63,7 +63,7 @@ public class ZkLeaderElector implements LeaderElector {
     this.hostName = getHostName();
     this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
 
-    zkUtils.makeSurePersistentPathsExists(new String[]{keyBuilder.getProcessorsPath()});
+    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
index 166c627..77defa4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
@@ -46,8 +46,8 @@ public class ZkProcessorLatch implements Latch {
     ZkKeyBuilder keyBuilder = this.zkUtils.getKeyBuilder();
 
     latchPath = String.format("%s/%s", keyBuilder.getRootPath(), LATCH_PATH + "_" + latchId);
-    // TODO: Verify that makeSurePersistentPathsExists doesn't fail with exceptions
-    zkUtils.makeSurePersistentPathsExists(new String[] {latchPath});
+    // TODO: Verify that validatePaths doesn't fail with exceptions
+    zkUtils.validatePaths(new String[] {latchPath});
     targetPath =  String.format("%s/%010d", latchPath, size - 1);
 
     LOGGER.debug("ZkProcessorLatch targetPath " + targetPath);

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 5df7114..6ca9052 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -32,7 +32,9 @@ import java.util.stream.Collectors;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
@@ -74,6 +76,8 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
+  /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";
+
 
   private final ZkClient zkClient;
   private volatile String ephemeralPath = null;
@@ -438,11 +442,46 @@ public class ZkUtils {
         "(actual data version after update = " + stat.getVersion() + ")");
   }
 
+  // validate that Zk protocol currently used by the job is the same as in this participant
+  public void validateZkVersion() {
+
+    // Version of the protocol is written into root znode. If root does not exist yet we need to create one.
+    String rootPath = keyBuilder.getRootPath();
+    if (!zkClient.exists(rootPath)) {
+      try {
+        // attempt to create the root with the correct version
+        zkClient.createPersistent(rootPath, ZK_PROTOCOL_VERSION);
+        LOG.info("Created zk root node: " + rootPath + " with zk version " + ZK_PROTOCOL_VERSION);
+        return;
+      } catch (ZkNodeExistsException e) {
+        // ignoring
+        LOG.warn("root path " + rootPath + " already exists.");
+      }
+    }
+    // if exists, verify the version
+    Stat stat = new Stat();
+    String version = (String) zkClient.readData(rootPath, stat);
+    if (version == null) {
+      // for backward compatibility, if no value - assume 1.0
+      try {
+        zkClient.writeData(rootPath, "1.0", stat.getVersion());
+      } catch (ZkBadVersionException e) {
+        // if the write failed with ZkBadVersionException it means someone else already wrote a version, so we can ignore it.
+      }
+      // re-read the updated version
+      version = (String) zkClient.readData(rootPath);
+    }
+    LOG.info("Current version for zk root node: " + rootPath + " is " + version + ", expected version is " + ZK_PROTOCOL_VERSION);
+    if (!version.equals(ZK_PROTOCOL_VERSION)) {
+      throw new SamzaException("ZK Protocol mismatch. Expected " + ZK_PROTOCOL_VERSION + "; found " + version);
+    }
+  }
+
   /**
    * verify that given paths exist in ZK
    * @param paths - paths to verify or create
    */
-  public void makeSurePersistentPathsExists(String[] paths) {
+  public void validatePaths(String[] paths) {
     for (String path : paths) {
       if (!zkClient.exists(path)) {
         zkClient.createPersistent(path, true);

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 3ff9175..010d138 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -100,7 +100,7 @@ public class TestZkLeaderElector {
     when(mockZkUtils.registerProcessorAndGetId(any())).
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
     when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
-    Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+    Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
 
     ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
     when(kb.getProcessorsPath()).thenReturn("");
@@ -119,7 +119,7 @@ public class TestZkLeaderElector {
     String processorId = "1";
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
-    Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+    Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
 
     ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
     when(kb.getProcessorsPath()).thenReturn("");

http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b5953d1..3c8f67e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.zk;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,6 +31,7 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
@@ -45,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -116,12 +119,58 @@ public class TestZkUtils {
   }
 
   @Test
+  public void testZKProtocolVersion() {
+    // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION
+    ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);
+    ZkControllerImpl zkController = new ZkControllerImpl("1", zkUtils, null, le);
+    zkController.register();
+    String root = zkUtils.getKeyBuilder().getRootPath();
+    String ver = (String) zkUtils.getZkClient().readData(root);
+    Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
+
+    // do it again (in case original value was null
+    zkController = new ZkControllerImpl("1", zkUtils, null, le);
+    zkController.register();
+    ver = (String) zkUtils.getZkClient().readData(root);
+    Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
+
+    // now negative case
+    zkUtils.getZkClient().writeData(root, "2.0");
+    try {
+      zkController = new ZkControllerImpl("1", zkUtils, null, le);
+      zkController.register();
+      Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0");
+    } catch (SamzaException e) {
+      // expected
+    }
+
+    // validate future values, let's say that current version should be 3.0
+    try {
+      Field f = zkUtils.getClass().getDeclaredField("ZK_PROTOCOL_VERSION");
+      FieldUtils.removeFinalModifier(f);
+      f.set(null, "3.0");
+    } catch (Exception e) {
+      System.out.println(e);
+      Assert.fail();
+    }
+
+    try {
+      zkController = new ZkControllerImpl("1", zkUtils, null, le);
+      zkController.register();
+      Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0");
+    } catch (SamzaException e) {
+      // expected
+    }
+  }
+
+  @Test
   public void testGetProcessorsIDs() {
     Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
     zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
     List<String> l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(1, l.size());
-    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(
+        new ProcessorData("host2", "2"));
     l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(2, l.size());
 
@@ -149,8 +198,7 @@ public class TestZkUtils {
     Assert.assertFalse(zkUtils.exists(root));
 
     // create the paths
-    zkUtils.makeSurePersistentPathsExists(
-        new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
+    zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
     Assert.assertTrue(zkUtils.exists(root));
     Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
     Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
@@ -213,8 +261,7 @@ public class TestZkUtils {
     String version = "1";
     String oldVersion = "0";
 
-    zkUtils.makeSurePersistentPathsExists(
-        new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+    zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
 
     zkUtils.publishJobModelVersion(oldVersion, version);
     Assert.assertEquals(version, zkUtils.getJobModelVersion());