You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:58 UTC
[17/26] samza git commit: SAMZA-1382: added Zk communication protocol
version verification
SAMZA-1382: added Zk communication protocol version verification
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #255 from sborya/ZkCommunicationVersion
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/80d82b6a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/80d82b6a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/80d82b6a
Branch: refs/heads/0.14.0
Commit: 80d82b6a51f6dc0b65f766b63704b30c780073ac
Parents: 67f7214
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Aug 22 17:55:54 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Aug 22 17:55:54 2017 -0700
----------------------------------------------------------------------
.../samza/zk/ZkBarrierForVersionUpgrade.java | 2 +-
.../org/apache/samza/zk/ZkControllerImpl.java | 19 ++++++-
.../org/apache/samza/zk/ZkLeaderElector.java | 2 +-
.../org/apache/samza/zk/ZkProcessorLatch.java | 4 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 41 +++++++++++++-
.../apache/samza/zk/TestZkLeaderElector.java | 4 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 57 ++++++++++++++++++--
7 files changed, 115 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 3257ee1..abea299 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -83,7 +83,7 @@ public class ZkBarrierForVersionUpgrade {
public void create(final String version, List<String> participants) {
String barrierRoot = keyBuilder.getBarrierRoot();
String barrierParticipantsPath = keyBuilder.getBarrierParticipantsPath(version);
- zkUtils.makeSurePersistentPathsExists(new String[]{
+ zkUtils.validatePaths(new String[]{
barrierRoot,
keyBuilder.getBarrierPath(version),
barrierParticipantsPath,
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 3af5042..6305616 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -46,8 +46,8 @@ public class ZkControllerImpl implements ZkController {
private void init() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
- zkUtils.makeSurePersistentPathsExists(
- new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+
+ zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
.getJobModelPathPrefix()});
}
@@ -57,6 +57,21 @@ public class ZkControllerImpl implements ZkController {
// possibly split into two method - becomeLeader() and becomeParticipant()
zkLeaderElector.tryBecomeLeader();
+ // make sure we are connection to a job that uses the same ZK communication protocol version.
+ try {
+ zkUtils.validateZkVersion();
+ } catch (SamzaException e) {
+ // IMPORTANT: Mismatch of the version, means we are trying to join a job, started by processors with different version.
+ // If there are no processors running, this is the place to do the migration to the new
+ // ZK structure.
+ // If some processors are running, then this processor should fail with an error to tell the user to stop all
+ // the processors before upgrading to this new version.
+ // TODO migration here
+ // for now we just rethrow the exception
+ throw e;
+ }
+
+
// subscribe to JobModel version updates
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 97430cb..f4c1e94 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -63,7 +63,7 @@ public class ZkLeaderElector implements LeaderElector {
this.hostName = getHostName();
this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
- zkUtils.makeSurePersistentPathsExists(new String[]{keyBuilder.getProcessorsPath()});
+ zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
index 166c627..77defa4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
@@ -46,8 +46,8 @@ public class ZkProcessorLatch implements Latch {
ZkKeyBuilder keyBuilder = this.zkUtils.getKeyBuilder();
latchPath = String.format("%s/%s", keyBuilder.getRootPath(), LATCH_PATH + "_" + latchId);
- // TODO: Verify that makeSurePersistentPathsExists doesn't fail with exceptions
- zkUtils.makeSurePersistentPathsExists(new String[] {latchPath});
+ // TODO: Verify that validatePaths doesn't fail with exceptions
+ zkUtils.validatePaths(new String[] {latchPath});
targetPath = String.format("%s/%010d", latchPath, size - 1);
LOGGER.debug("ZkProcessorLatch targetPath " + targetPath);
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 5df7114..6ca9052 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -32,7 +32,9 @@ import java.util.stream.Collectors;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.job.model.JobModel;
@@ -74,6 +76,8 @@ import org.slf4j.LoggerFactory;
*/
public class ZkUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
+ /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";
+
private final ZkClient zkClient;
private volatile String ephemeralPath = null;
@@ -438,11 +442,46 @@ public class ZkUtils {
"(actual data version after update = " + stat.getVersion() + ")");
}
+ // validate that Zk protocol currently used by the job is the same as in this participant
+ public void validateZkVersion() {
+
+ // Version of the protocol is written into root znode. If root does not exist yet we need to create one.
+ String rootPath = keyBuilder.getRootPath();
+ if (!zkClient.exists(rootPath)) {
+ try {
+ // attempt to create the root with the correct version
+ zkClient.createPersistent(rootPath, ZK_PROTOCOL_VERSION);
+ LOG.info("Created zk root node: " + rootPath + " with zk version " + ZK_PROTOCOL_VERSION);
+ return;
+ } catch (ZkNodeExistsException e) {
+ // ignoring
+ LOG.warn("root path " + rootPath + " already exists.");
+ }
+ }
+ // if exists, verify the version
+ Stat stat = new Stat();
+ String version = (String) zkClient.readData(rootPath, stat);
+ if (version == null) {
+ // for backward compatibility, if no value - assume 1.0
+ try {
+ zkClient.writeData(rootPath, "1.0", stat.getVersion());
+ } catch (ZkBadVersionException e) {
+ // if the write failed with ZkBadVersionException it means someone else already wrote a version, so we can ignore it.
+ }
+ // re-read the updated version
+ version = (String) zkClient.readData(rootPath);
+ }
+ LOG.info("Current version for zk root node: " + rootPath + " is " + version + ", expected version is " + ZK_PROTOCOL_VERSION);
+ if (!version.equals(ZK_PROTOCOL_VERSION)) {
+ throw new SamzaException("ZK Protocol mismatch. Expected " + ZK_PROTOCOL_VERSION + "; found " + version);
+ }
+ }
+
/**
* verify that given paths exist in ZK
* @param paths - paths to verify or create
*/
- public void makeSurePersistentPathsExists(String[] paths) {
+ public void validatePaths(String[] paths) {
for (String path : paths) {
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 3ff9175..010d138 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -100,7 +100,7 @@ public class TestZkLeaderElector {
when(mockZkUtils.registerProcessorAndGetId(any())).
thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
- Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+ Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
when(kb.getProcessorsPath()).thenReturn("");
@@ -119,7 +119,7 @@ public class TestZkLeaderElector {
String processorId = "1";
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
- Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+ Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
when(kb.getProcessorsPath()).thenReturn("");
http://git-wip-us.apache.org/repos/asf/samza/blob/80d82b6a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b5953d1..3c8f67e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.zk;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -30,6 +31,7 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
@@ -45,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -116,12 +119,58 @@ public class TestZkUtils {
}
@Test
+ public void testZKProtocolVersion() {
+ // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION
+ ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);
+ ZkControllerImpl zkController = new ZkControllerImpl("1", zkUtils, null, le);
+ zkController.register();
+ String root = zkUtils.getKeyBuilder().getRootPath();
+ String ver = (String) zkUtils.getZkClient().readData(root);
+ Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
+
+ // do it again (in case original value was null
+ zkController = new ZkControllerImpl("1", zkUtils, null, le);
+ zkController.register();
+ ver = (String) zkUtils.getZkClient().readData(root);
+ Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
+
+ // now negative case
+ zkUtils.getZkClient().writeData(root, "2.0");
+ try {
+ zkController = new ZkControllerImpl("1", zkUtils, null, le);
+ zkController.register();
+ Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0");
+ } catch (SamzaException e) {
+ // expected
+ }
+
+ // validate future values, let's say that current version should be 3.0
+ try {
+ Field f = zkUtils.getClass().getDeclaredField("ZK_PROTOCOL_VERSION");
+ FieldUtils.removeFinalModifier(f);
+ f.set(null, "3.0");
+ } catch (Exception e) {
+ System.out.println(e);
+ Assert.fail();
+ }
+
+ try {
+ zkController = new ZkControllerImpl("1", zkUtils, null, le);
+ zkController.register();
+ Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0");
+ } catch (SamzaException e) {
+ // expected
+ }
+ }
+
+ @Test
public void testGetProcessorsIDs() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
List<String> l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(1, l.size());
- new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+ new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(
+ new ProcessorData("host2", "2"));
l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(2, l.size());
@@ -149,8 +198,7 @@ public class TestZkUtils {
Assert.assertFalse(zkUtils.exists(root));
// create the paths
- zkUtils.makeSurePersistentPathsExists(
- new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
+ zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
Assert.assertTrue(zkUtils.exists(root));
Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
@@ -213,8 +261,7 @@ public class TestZkUtils {
String version = "1";
String oldVersion = "0";
- zkUtils.makeSurePersistentPathsExists(
- new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+ zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());