You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/22 19:29:46 UTC
samza git commit: SAMZA-1561: Fix inconsistency problem in JobModel
publish.
Repository: samza
Updated Branches:
refs/heads/master e075e956f -> 24d22bb80
SAMZA-1561: Fix inconsistency problem in JobModel publish.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Jagadish V<ja...@apache.org>, Xinyu Liu<xi...@apache.org>
Closes #409 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/24d22bb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/24d22bb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/24d22bb8
Branch: refs/heads/master
Commit: 24d22bb801f5be6ac9b7afea3f0753e04fb2df2c
Parents: e075e95
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Jan 22 11:31:08 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Jan 22 11:31:08 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/zk/ZkJobCoordinator.java | 7 +-----
.../main/java/org/apache/samza/zk/ZkUtils.java | 23 ++++++++++++++++++++
.../java/org/apache/samza/zk/TestZkUtils.java | 22 +++++++++++++++++++
3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 0509474..f0c2ec7 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -201,12 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
// Assign the next version of JobModel
String currentJMVersion = zkUtils.getJobModelVersion();
- String nextJMVersion;
- if (currentJMVersion == null) {
- nextJMVersion = "1";
- } else {
- nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
- }
+ String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);
LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
// Publish the new job model
http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 2f60d52..f34ba4e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -413,6 +413,29 @@ public class ZkUtils {
}
/**
+ * Generates the next JobModel version that should be used by a processor group in a rebalancing phase
+ * for coordination.
+ * @param currentJobModelVersion the current version of JobModel.
+ * @return the next JobModel version.
+ */
+ public String getNextJobModelVersion(String currentJobModelVersion) {
+ if (currentJobModelVersion == null) {
+ return "1";
+ } else {
+ /**
+ * There's inconsistency between the maximum published jobModel version and value stored in jobModelVersion
+ * zookeeper node. Short term fix is to read all published jobModel versions and choose the maximum version. If there's a
+ * inconsistency, update the jobModelVersionPath with maximum published jobModelVersion.
+ */
+ List<String> publishedJobModelVersions = zkClient.getChildren(keyBuilder.getJobModelPathPrefix());
+ metrics.reads.inc(publishedJobModelVersions.size());
+ String maxPublishedJMVersion = publishedJobModelVersions.stream()
+ .max(Comparator.comparingInt(Integer::valueOf)).orElse("0");
+ return Integer.toString(Math.max(Integer.valueOf(currentJobModelVersion), Integer.valueOf(maxPublishedJMVersion)) + 1);
+ }
+ }
+
+ /**
* publish the version number of the next JobModel
* @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
* @param newVersion - new version.
http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 3c8f67e..5d47dfc 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -414,4 +414,26 @@ public class TestZkUtils {
Assert.fail("Sleep was interrupted");
}
}
+ @Test
+ public void testgetNextJobModelVersion() {
+ // Set up the Zk base paths for testing.
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+ String root = keyBuilder.getRootPath();
+ zkClient.deleteRecursive(root);
+ zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+
+ String version = "1";
+ String oldVersion = "0";
+
+ // Set zkNode JobModelVersion to 1.
+ zkUtils.publishJobModelVersion(oldVersion, version);
+
+ Assert.assertEquals(version, zkUtils.getJobModelVersion());
+
+ // Publish JobModel with a higher version (2).
+ zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
+
+ // Get on the JobModel version should return 2, taking into account the published version 2.
+ Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
+ }
}