You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:56 UTC

[7/9] samza git commit: SAMZA-1107:Job model publish

SAMZA-1107:Job model publish

add utils for publishing job model and job model version to ZK.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fr...@yahoo.com>

Closes #67 from sborya/JobModelPublish1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d104013e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d104013e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d104013e

Branch: refs/heads/samza-fluent-api-v1
Commit: d104013ef16ffd959916a52e9e3f6e67a6e486b3
Parents: 4d7b3b3
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Mar 1 13:49:29 2017 -0800
Committer: navina <na...@apache.org>
Committed: Wed Mar 1 13:49:29 2017 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 75 +++++++++++++++++++-
 .../org/apache/samza/zk/TestZkKeyBuilder.java   | 12 ++++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 46 ++++++++++--
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 320cd49..73376b1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.zk;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +28,11 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,6 +161,44 @@ public class ZkUtils {
   }
 
   /**
+   * Publishes new job model into ZK.
+   * This call should FAIL if the node already exists.
+   * @param jobModelVersion  version of the jobModeL to publish
+   * @param jobModel jobModel to publish
+   *
+   */
+  public void publishJobModel(String jobModelVersion, JobModel jobModel) {
+    try {
+      ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+      String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
+      LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+      zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
+      LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
+    } catch (Exception e) {
+      LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * get the job model from ZK by version
+   * @param jobModelVersion jobModel version to get
+   * @return job model for this version
+   */
+  public JobModel getJobModel(String jobModelVersion) {
+    LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+    Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+    ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+    JobModel jm;
+    try {
+      jm = mmapper.readValue((String) data, JobModel.class);
+    } catch (IOException e) {
+      throw new SamzaException("failed to read JobModel from ZK", e);
+    }
+    return jm;
+  }
+
+  /**
    * read the jobmodel version from ZK
    * @return jobmodel version as a string
    */
@@ -163,6 +207,36 @@ public class ZkUtils {
   }
 
   /**
+   * publish the version number of the next JobModel
+   * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
+   * @param newVersion - new version.
+   */
+  public void publishJobModelVersion(String oldVersion, String newVersion) {
+    Stat stat = new Stat();
+    String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
+        .getVersion() + ")");
+
+    if (currentVersion != null && !currentVersion.equals(oldVersion)) {
+      throw new SamzaException(
+          "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
+    }
+    // data version is the ZK version of the data from the ZK.
+    int dataVersion = stat.getVersion();
+    try {
+      stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
+    } catch (Exception e) {
+      String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
+      LOG.error(msg, e);
+      throw new SamzaException(msg);
+    }
+    LOG.info("pid=" + processorId +
+        " published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) + "(actual data version after update = " + stat.getVersion()
+        +    ")");
+  }
+
+
+  /**
    * verify that given paths exist in ZK
    * @param paths - paths to verify or create
    */
@@ -190,5 +264,4 @@ public class ZkUtils {
       zkClient.deleteRecursive(rootPath);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 8e048b2..b56d279 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -50,4 +50,16 @@ public class TestZkKeyBuilder {
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
   }
+
+  @Test
+  public void testJobModelPath() {
+
+    ZkKeyBuilder builder = new ZkKeyBuilder("test");
+
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
+    Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
+    String version = "2";
+    Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
+    Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b719e28..58c3ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,11 +18,17 @@
  */
 package org.apache.samza.zk;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -60,7 +66,6 @@ public class TestZkUtils {
       // Do nothing
     }
 
-
     zkUtils = new ZkUtils(
         KEY_BUILDER,
         zkClient,
@@ -96,11 +101,9 @@ public class TestZkUtils {
   public void testGetActiveProcessors() {
     Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
     zkUtils.registerProcessorAndGetId("processorData");
-
     Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
-
   }
-
+  
   @Test
   public void testSubscribeToJobModelVersionChange() {
 
@@ -157,6 +160,41 @@ public class TestZkUtils {
     Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  @Test
+  public void testPublishNewJobModel() {
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+    String root = keyBuilder.getRootPath();
+    zkClient.deleteRecursive(root);
+    String version = "1";
+    String oldVersion = "0";
+
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+
+    zkUtils.publishJobModelVersion(oldVersion, version);
+    Assert.assertEquals(version, zkUtils.getJobModelVersion());
+
+    String newerVersion = Long.toString(Long.valueOf(version) + 1);
+    zkUtils.publishJobModelVersion(version, newerVersion);
+    Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
+
+    try {
+      zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
+      Assert.fail("publish invalid version should've failed");
+    } catch (SamzaException e) {
+      // expected
+    }
+
+    // create job model
+    Map<String, String> configMap = new HashMap<>();
+    Map<Integer, ContainerModel> containers = new HashMap<>();
+    MapConfig config = new MapConfig(configMap);
+    JobModel jobModel = new JobModel(config, containers);
+
+    zkUtils.publishJobModel(version, jobModel);
+    Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
+  }
+
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {