You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:56 UTC
[7/9] samza git commit: SAMZA-1107:Job model publish
SAMZA-1107:Job model publish
add utils for publishing job model and job model version to ZK.
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fr...@yahoo.com>
Closes #67 from sborya/JobModelPublish1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d104013e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d104013e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d104013e
Branch: refs/heads/samza-fluent-api-v1
Commit: d104013ef16ffd959916a52e9e3f6e67a6e486b3
Parents: 4d7b3b3
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Mar 1 13:49:29 2017 -0800
Committer: navina <na...@apache.org>
Committed: Wed Mar 1 13:49:29 2017 -0800
----------------------------------------------------------------------
.../main/java/org/apache/samza/zk/ZkUtils.java | 75 +++++++++++++++++++-
.../org/apache/samza/zk/TestZkKeyBuilder.java | 12 ++++
.../java/org/apache/samza/zk/TestZkUtils.java | 46 ++++++++++--
3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 320cd49..73376b1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
package org.apache.samza.zk;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,11 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,6 +161,44 @@ public class ZkUtils {
}
/**
+ * Publishes new job model into ZK.
+ * This call should FAIL if the node already exists.
+ * @param jobModelVersion version of the jobModeL to publish
+ * @param jobModel jobModel to publish
+ *
+ */
+ public void publishJobModel(String jobModelVersion, JobModel jobModel) {
+ try {
+ ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+ String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
+ LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+ zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
+ LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
+ } catch (Exception e) {
+ LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
+ throw new SamzaException(e);
+ }
+ }
+
+ /**
+ * get the job model from ZK by version
+ * @param jobModelVersion jobModel version to get
+ * @return job model for this version
+ */
+ public JobModel getJobModel(String jobModelVersion) {
+ LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+ Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+ ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+ JobModel jm;
+ try {
+ jm = mmapper.readValue((String) data, JobModel.class);
+ } catch (IOException e) {
+ throw new SamzaException("failed to read JobModel from ZK", e);
+ }
+ return jm;
+ }
+
+ /**
* read the jobmodel version from ZK
* @return jobmodel version as a string
*/
@@ -163,6 +207,36 @@ public class ZkUtils {
}
/**
+ * publish the version number of the next JobModel
+ * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
+ * @param newVersion - new version.
+ */
+ public void publishJobModelVersion(String oldVersion, String newVersion) {
+ Stat stat = new Stat();
+ String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+ LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
+ .getVersion() + ")");
+
+ if (currentVersion != null && !currentVersion.equals(oldVersion)) {
+ throw new SamzaException(
+ "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
+ }
+ // data version is the ZK version of the data from the ZK.
+ int dataVersion = stat.getVersion();
+ try {
+ stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
+ } catch (Exception e) {
+ String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
+ LOG.error(msg, e);
+ throw new SamzaException(msg);
+ }
+ LOG.info("pid=" + processorId +
+ " published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) + "(actual data version after update = " + stat.getVersion()
+ + ")");
+ }
+
+
+ /**
* verify that given paths exist in ZK
* @param paths - paths to verify or create
*/
@@ -190,5 +264,4 @@ public class ZkUtils {
zkClient.deleteRecursive(rootPath);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 8e048b2..b56d279 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -50,4 +50,16 @@ public class TestZkKeyBuilder {
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
}
+
+ @Test
+ public void testJobModelPath() {
+
+ ZkKeyBuilder builder = new ZkKeyBuilder("test");
+
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
+ Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
+ String version = "2";
+ Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
+ Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b719e28..58c3ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,11 +18,17 @@
*/
package org.apache.samza.zk;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.BooleanSupplier;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.After;
import org.junit.AfterClass;
@@ -60,7 +66,6 @@ public class TestZkUtils {
// Do nothing
}
-
zkUtils = new ZkUtils(
KEY_BUILDER,
zkClient,
@@ -96,11 +101,9 @@ public class TestZkUtils {
public void testGetActiveProcessors() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
zkUtils.registerProcessorAndGetId("processorData");
-
Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
-
}
-
+
@Test
public void testSubscribeToJobModelVersionChange() {
@@ -157,6 +160,41 @@ public class TestZkUtils {
Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
}
+ @Test
+ public void testPublishNewJobModel() {
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+ String root = keyBuilder.getRootPath();
+ zkClient.deleteRecursive(root);
+ String version = "1";
+ String oldVersion = "0";
+
+ zkUtils.makeSurePersistentPathsExists(
+ new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+
+ zkUtils.publishJobModelVersion(oldVersion, version);
+ Assert.assertEquals(version, zkUtils.getJobModelVersion());
+
+ String newerVersion = Long.toString(Long.valueOf(version) + 1);
+ zkUtils.publishJobModelVersion(version, newerVersion);
+ Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
+
+ try {
+ zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
+ Assert.fail("publish invalid version should've failed");
+ } catch (SamzaException e) {
+ // expected
+ }
+
+ // create job model
+ Map<String, String> configMap = new HashMap<>();
+ Map<Integer, ContainerModel> containers = new HashMap<>();
+ MapConfig config = new MapConfig(configMap);
+ JobModel jobModel = new JobModel(config, containers);
+
+ zkUtils.publishJobModel(version, jobModel);
+ Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
+ }
+
public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
long delay = startDelayMs;
while (delay < maxDelayMs) {