You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/18 17:01:16 UTC
samza git commit: SAMZA-1640: JobModel Json deserialization error in
ZkJobCoordinator.
Repository: samza
Updated Branches:
refs/heads/master aac6368a2 -> bca978eb2
SAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: Shanthoosh Venkataraman <sv...@LM-LSNSCDW5132.linkedin.biz>
Reviewers: Xinyu Liu <xi...@gmail.com>
Closes #466 from shanthoosh/SAMZA-1640
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bca978eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bca978eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bca978eb
Branch: refs/heads/master
Commit: bca978eb2cf1f1e781ed349ace87729b98be2145
Parents: aac6368
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Apr 18 10:00:56 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Apr 18 10:00:56 2018 -0700
----------------------------------------------------------------------
.../ClusterBasedJobCoordinator.java | 18 +++++++++--
.../org/apache/samza/job/model/JobModel.java | 13 ++------
.../samza/job/local/ProcessJobFactory.scala | 13 +++++++-
.../samza/job/local/ThreadJobFactory.scala | 14 +++++++--
.../model/TestSamzaObjectMapper.java | 33 +++++++++++++-------
5 files changed, 62 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 60cc65d..016d171 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.SamzaException;
@@ -31,10 +32,13 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.JmxServer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -49,7 +53,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Implements a JobCoordinator that is completely independent of the underlying cluster
* manager system. This {@link ClusterBasedJobCoordinator} handles functionality common
@@ -219,8 +222,17 @@ public class ClusterBasedJobCoordinator {
ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
// Remap changelog partitions to tasks
- Map prevPartitionMappings = changelogStreamManager.readPartitionMapping();
- changelogStreamManager.updatePartitionMapping(prevPartitionMappings, jobModel.getTaskPartitionMappings());
+ Map<TaskName, Integer> prevPartitionMappings = changelogStreamManager.readPartitionMapping();
+
+ Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
+ Map<String, ContainerModel> containers = jobModel.getContainers();
+ for (ContainerModel containerModel: containers.values()) {
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
+ }
+ }
+
+ changelogStreamManager.updatePartitionMapping(prevPartitionMappings, taskPartitionMappings);
containerProcessManager.start();
systemAdmins.start();
http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dc2bff8..d2f8fda 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -22,11 +22,10 @@ package org.apache.samza.job.model;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
/**
* <p>
@@ -40,6 +39,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
* an id, partition information, etc.
* </p>
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JobModel {
private static final String EMPTY_STRING = "";
private final Config config;
@@ -143,15 +143,6 @@ public class JobModel {
return containers;
}
- public Map<TaskName, Integer> getTaskPartitionMappings() {
- HashMap<TaskName, Integer> mappings = new HashMap<>();
- for (Map.Entry<String, ContainerModel> container: containers.entrySet()) {
- mappings.putAll(container.getValue().getTasks().entrySet().stream()
- .collect(Collectors.toMap(t -> t.getKey(), t -> t.getValue().getChangelogPartition().getPartitionId())));
- }
- return mappings;
- }
-
@Override
public String toString() {
return "JobModel [config=" + config + ", containers=" + containers + "]";
http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fe679d3..1401f82 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -19,15 +19,18 @@
package org.apache.samza.job.local
+import java.util
import org.apache.samza.SamzaException
import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
import org.apache.samza.config.TaskConfig._
+import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.util.{Logging, Util}
+import scala.collection.JavaConversions._
/**
* Creates a stand alone ProcessJob with the specified config.
@@ -49,7 +52,15 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
val jobModel = coordinator.jobModel
- changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+ val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
+ for (containerModel <- jobModel.getContainers.values) {
+ for (taskModel <- containerModel.getTasks.values) {
+ taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
+ }
+ }
+
+ changelogStreamManager.writePartitionMapping(taskPartitionMappings)
//create necessary checkpoint and changelog streams
val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 5ab4827..e5ce3c8 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -22,7 +22,7 @@ package org.apache.samza.job.local
import org.apache.samza.config.{Config, TaskConfigJava}
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.container.{SamzaContainer, SamzaContainerListener}
+import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.job.{StreamJob, StreamJobFactory}
@@ -31,6 +31,8 @@ import org.apache.samza.runtime.LocalContainerRunner
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.task.TaskFactoryUtil
import org.apache.samza.util.Logging
+import scala.collection.JavaConversions._
+import scala.collection.mutable
/**
* Creates a new Thread job with the given config
@@ -48,7 +50,15 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
val jobModel = coordinator.jobModel
- changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+ val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
+ for (containerModel <- jobModel.getContainers.values) {
+ for (taskModel <- containerModel.getTasks.values) {
+ taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
+ }
+ }
+
+ changelogStreamManager.writePartitionMapping(taskPartitionMappings)
//create necessary checkpoint and changelog streams
val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
http://git-wip-us.apache.org/repos/asf/samza/blob/bca978eb/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 505acac..02c3a9d 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -76,20 +76,29 @@ public class TestSamzaObjectMapper {
* When serializing, we serialize both the fields in 0.13. Deserialization correctly handles the fields in 0.13.
*/
@Test
- public void testContainerModelCompatible() {
- try {
- String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
- ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
- JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+ public void testContainerModelCompatible() throws Exception {
+ String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+ ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+
+ String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+ ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+
+ Assert.assertEquals(jobModel, jobModel1);
+ }
+
+ @Test
+ public void testUnknownFieldsInJobModelJsonDoesNotFailDeserialization() throws Exception {
+ String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}, \"task-partition-mapping\":{\"1\":null}}";
+ ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
- String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
- ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
- JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+ String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+ ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
- Assert.assertEquals(jobModel, jobModel1);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ Assert.assertEquals(jobModel, jobModel1);
}
}