You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/18 17:01:16 UTC

samza git commit: SAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.

Repository: samza
Updated Branches:
  refs/heads/master aac6368a2 -> bca978eb2


SAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: Shanthoosh Venkataraman <sv...@LM-LSNSCDW5132.linkedin.biz>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #466 from shanthoosh/SAMZA-1640


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bca978eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bca978eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bca978eb

Branch: refs/heads/master
Commit: bca978eb2cf1f1e781ed349ace87729b98be2145
Parents: aac6368
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Apr 18 10:00:56 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Apr 18 10:00:56 2018 -0700

----------------------------------------------------------------------
 .../ClusterBasedJobCoordinator.java             | 18 +++++++++--
 .../org/apache/samza/job/model/JobModel.java    | 13 ++------
 .../samza/job/local/ProcessJobFactory.scala     | 13 +++++++-
 .../samza/job/local/ThreadJobFactory.scala      | 14 +++++++--
 .../model/TestSamzaObjectMapper.java            | 33 +++++++++++++-------
 5 files changed, 62 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 60cc65d..016d171 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
@@ -31,10 +32,13 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -49,7 +53,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 /**
  * Implements a JobCoordinator that is completely independent of the underlying cluster
  * manager system. This {@link ClusterBasedJobCoordinator} handles functionality common
@@ -219,8 +222,17 @@ public class ClusterBasedJobCoordinator {
       ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
 
       // Remap changelog partitions to tasks
-      Map prevPartitionMappings = changelogStreamManager.readPartitionMapping();
-      changelogStreamManager.updatePartitionMapping(prevPartitionMappings, jobModel.getTaskPartitionMappings());
+      Map<TaskName, Integer> prevPartitionMappings = changelogStreamManager.readPartitionMapping();
+
+      Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
+      Map<String, ContainerModel> containers = jobModel.getContainers();
+      for (ContainerModel containerModel: containers.values()) {
+        for (TaskModel taskModel : containerModel.getTasks().values()) {
+          taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
+        }
+      }
+
+      changelogStreamManager.updatePartitionMapping(prevPartitionMappings, taskPartitionMappings);
 
       containerProcessManager.start();
       systemAdmins.start();

http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dc2bff8..d2f8fda 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -22,11 +22,10 @@ package org.apache.samza.job.model;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 /**
  * <p>
@@ -40,6 +39,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * an id, partition information, etc.
  * </p>
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class JobModel {
   private static final String EMPTY_STRING = "";
   private final Config config;
@@ -143,15 +143,6 @@ public class JobModel {
     return containers;
   }
 
-  public Map<TaskName, Integer> getTaskPartitionMappings() {
-    HashMap<TaskName, Integer> mappings = new HashMap<>();
-    for (Map.Entry<String, ContainerModel> container: containers.entrySet()) {
-      mappings.putAll(container.getValue().getTasks().entrySet().stream()
-          .collect(Collectors.toMap(t -> t.getKey(), t -> t.getValue().getChangelogPartition().getPartitionId())));
-    }
-    return mappings;
-  }
-
   @Override
   public String toString() {
     return "JobModel [config=" + config + ", containers=" + containers + "]";

http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fe679d3..1401f82 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -19,15 +19,18 @@
 
 package org.apache.samza.job.local
 
+import java.util
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.config.TaskConfig._
+import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.util.{Logging, Util}
+import scala.collection.JavaConversions._
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
@@ -49,7 +52,15 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
 
     val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
     val jobModel = coordinator.jobModel
-    changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+    val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
+    for (containerModel <- jobModel.getContainers.values) {
+      for (taskModel <- containerModel.getTasks.values) {
+        taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
+      }
+    }
+
+    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
     //create necessary checkpoint and changelog streams
     val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 5ab4827..e5ce3c8 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -22,7 +22,7 @@ package org.apache.samza.job.local
 import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.container.{SamzaContainer, SamzaContainerListener}
+import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
@@ -31,6 +31,8 @@ import org.apache.samza.runtime.LocalContainerRunner
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
+import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 /**
  * Creates a new Thread job with the given config
@@ -48,7 +50,15 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
 
     val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
     val jobModel = coordinator.jobModel
-    changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+    val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
+    for (containerModel <- jobModel.getContainers.values) {
+      for (taskModel <- containerModel.getTasks.values) {
+        taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
+      }
+    }
+
+    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
     //create necessary checkpoint and changelog streams
     val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 505acac..02c3a9d 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -76,20 +76,29 @@ public class TestSamzaObjectMapper {
    * When serializing, we serialize both the fields in 0.13. Deserialization correctly handles the fields in 0.13.
    */
   @Test
-  public void testContainerModelCompatible() {
-    try {
-      String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
-      ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
-      JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+  public void testContainerModelCompatible() throws Exception {
+    String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+    JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+
+    String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+    ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+    JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+
+    Assert.assertEquals(jobModel, jobModel1);
+  }
+
+  @Test
+  public void testUnknownFieldsInJobModelJsonDoesNotFailDeserialization() throws Exception {
+    String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}, \"task-partition-mapping\":{\"1\":null}}";
+    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+    JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
 
-      String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
-      ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
-      JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+    String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+    ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+    JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
 
-      Assert.assertEquals(jobModel, jobModel1);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+    Assert.assertEquals(jobModel, jobModel1);
   }
 
 }