You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/22 19:29:46 UTC

samza git commit: SAMZA-1561: Fix inconsistency problem in JobModel publish.

Repository: samza
Updated Branches:
  refs/heads/master e075e956f -> 24d22bb80


SAMZA-1561: Fix inconsistency problem in JobModel publish.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Jagadish V<ja...@apache.org>, Xinyu Liu<xi...@apache.org>

Closes #409 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/24d22bb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/24d22bb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/24d22bb8

Branch: refs/heads/master
Commit: 24d22bb801f5be6ac9b7afea3f0753e04fb2df2c
Parents: e075e95
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Jan 22 11:31:08 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Jan 22 11:31:08 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  7 +-----
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 23 ++++++++++++++++++++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 22 +++++++++++++++++++
 3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 0509474..f0c2ec7 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -201,12 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     // Assign the next version of JobModel
     String currentJMVersion = zkUtils.getJobModelVersion();
-    String nextJMVersion;
-    if (currentJMVersion == null) {
-      nextJMVersion = "1";
-    } else {
-      nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
-    }
+    String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);
     LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
 
     // Publish the new job model

http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 2f60d52..f34ba4e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -413,6 +413,29 @@ public class ZkUtils {
   }
 
   /**
+   * Generates the next JobModel version that should be used by a processor group in a rebalancing phase
+   * for coordination.
+   * @param currentJobModelVersion the current version of JobModel.
+   * @return the next JobModel version.
+   */
+  public String getNextJobModelVersion(String currentJobModelVersion) {
+    if (currentJobModelVersion == null) {
+      return  "1";
+    } else {
+      /**
+       * There's inconsistency between the maximum published jobModel version and value stored in jobModelVersion
+       * zookeeper node. Short term fix is to read all published jobModel versions and choose the maximum version. If there's a
+       * inconsistency, update the jobModelVersionPath with maximum published jobModelVersion.
+       */
+      List<String> publishedJobModelVersions = zkClient.getChildren(keyBuilder.getJobModelPathPrefix());
+      metrics.reads.inc(publishedJobModelVersions.size());
+      String maxPublishedJMVersion = publishedJobModelVersions.stream()
+                                                              .max(Comparator.comparingInt(Integer::valueOf)).orElse("0");
+      return Integer.toString(Math.max(Integer.valueOf(currentJobModelVersion), Integer.valueOf(maxPublishedJMVersion)) + 1);
+    }
+  }
+
+  /**
    * publish the version number of the next JobModel
    * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
    * @param newVersion - new version.

http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 3c8f67e..5d47dfc 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -414,4 +414,26 @@ public class TestZkUtils {
       Assert.fail("Sleep was interrupted");
     }
   }
+  @Test
+  public void testgetNextJobModelVersion() {
+    // Set up the Zk base paths for testing.
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+    String root = keyBuilder.getRootPath();
+    zkClient.deleteRecursive(root);
+    zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+
+    String version = "1";
+    String oldVersion = "0";
+
+    // Set zkNode JobModelVersion to 1.
+    zkUtils.publishJobModelVersion(oldVersion, version);
+
+    Assert.assertEquals(version, zkUtils.getJobModelVersion());
+
+    // Publish JobModel with a higher version (2).
+    zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
+
+    // Get on the JobModel version should return 2, taking into account the published version 2.
+    Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
+  }
 }