You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/01/17 18:18:42 UTC
samza git commit: SAMZA-2056: Adding a TaskMode in the TaskModel
Repository: samza
Updated Branches:
refs/heads/master 5ff7f239f -> 0cb8cdbf0
SAMZA-2056: Adding a TaskMode in the TaskModel
This PR adds a TaskMode (an enum) to the TaskModel.
This assignment is persisted to the metastore using a SetTaskModeMapping type.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Shanthoosh Venkatraman <sv...@linkedin.com>
Closes #871 from rmatharu/taskmode
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0cb8cdbf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0cb8cdbf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0cb8cdbf
Branch: refs/heads/master
Commit: 0cb8cdbf08b2ed30e0a1c4a1b4fef7e5aaf76bf6
Parents: 5ff7f23
Author: Ray Matharu <rm...@linkedin.com>
Authored: Thu Jan 17 10:18:39 2019 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Jan 17 10:18:39 2019 -0800
----------------------------------------------------------------------
.../org/apache/samza/job/model/TaskMode.java | 38 +++++++++++
.../org/apache/samza/job/model/TaskModel.java | 34 +++++++++-
.../grouper/task/TaskAssignmentManager.java | 50 ++++++++------
.../stream/CoordinatorStreamValueSerde.java | 7 ++
.../stream/messages/SetTaskModeMapping.java | 70 ++++++++++++++++++++
.../serializers/model/JsonTaskModelMixIn.java | 1 +
.../serializers/model/SamzaObjectMapper.java | 24 +++++++
.../samza/coordinator/JobModelManager.scala | 7 +-
.../grouper/task/TestTaskAssignmentManager.java | 5 +-
.../samza/coordinator/TestJobModelManager.java | 11 +--
.../model/TestSamzaObjectMapper.java | 17 +++++
11 files changed, 232 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
new file mode 100644
index 0000000..c98b94d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.model;
+
+/**
+ * This defines the logical mode of a taskInstance.
+ * Active is the defacto mode for a task, i.e., tasks processing input, reading/writing state, producing output, etc.
+ * Standby is the mode for tasks, that maintain warmed-up KV state by reading from its changelog.
+ */
+public enum TaskMode {
+ Active("Active"), Standby("Standby");
+
+ private final String mode;
+
+ private TaskMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String toString() {
+ return this.mode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
index 36917cf..c29a714 100644
--- a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -35,11 +35,31 @@ public class TaskModel implements Comparable<TaskModel> {
private final TaskName taskName;
private final Set<SystemStreamPartition> systemStreamPartitions;
private final Partition changelogPartition;
+ private final TaskMode taskMode;
- public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+ /**
+ * Create a TaskModel for an active task with the given taskName, SSPs, and changelogPartition.
+ * @param taskName The desired taskName
+ * @param systemStreamPartitions SSPs assigned to this task.
+ * @param changelogPartition The changelog SSP for this task.
+ * @param taskMode The mode of the task
+ */
+ public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition, TaskMode taskMode) {
this.taskName = taskName;
this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
this.changelogPartition = changelogPartition;
+ this.taskMode = taskMode;
+ }
+
+
+ /**
+ * Create a TaskModel for an active task with the given taskName, SSPs, and changelogPartition.
+ * @param taskName The desired taskName
+ * @param systemStreamPartitions SSPs assigned to this task.
+ * @param changelogPartition The changelog SSP for this task.
+ */
+ public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+ this(taskName, systemStreamPartitions, changelogPartition, TaskMode.Active);
}
/**
@@ -66,6 +86,10 @@ public class TaskModel implements Comparable<TaskModel> {
return changelogPartition;
}
+ public TaskMode getTaskMode() {
+ return this.taskMode;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -87,6 +111,10 @@ public class TaskModel implements Comparable<TaskModel> {
return false;
}
+ if (!taskMode.equals(taskModel.taskMode)) {
+ return false;
+ }
+
return true;
}
@@ -95,12 +123,14 @@ public class TaskModel implements Comparable<TaskModel> {
int result = taskName.hashCode();
result = 31 * result + systemStreamPartitions.hashCode();
result = 31 * result + changelogPartition.hashCode();
+ result = 31 * result + taskMode.hashCode();
return result;
}
@Override
public String toString() {
- return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+ return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions
+ + ", changeLogPartition=" + changelogPartition + ", taskMode=" + this.taskMode + "]";
}
public int compareTo(TaskModel other) {
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index b6e946c..e151384 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -21,12 +21,13 @@ package org.apache.samza.container.grouper.task;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
@@ -45,9 +46,11 @@ public class TaskAssignmentManager {
private final Config config;
private final Map<String, String> taskNameToContainerId = new HashMap<>();
private final Serde<String> keySerde;
- private final Serde<String> valueSerde;
+ private final Serde<String> containerIdSerde;
+ private final Serde<String> taskModeSerde;
- private MetadataStore metadataStore;
+ private MetadataStore taskContainerMappingMetadataStore;
+ private MetadataStore taskModeMappingMetadataStore;
/**
* Builds the TaskAssignmentManager based upon {@link Config} and {@link MetricsRegistry}.
@@ -59,13 +62,13 @@ public class TaskAssignmentManager {
*/
public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry) {
this(config, metricsRegistry, new CoordinatorStreamKeySerde(SetTaskContainerMapping.TYPE),
- new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE));
+ new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE), new CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE));
}
/**
* Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
*
- * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs before reading/writing
+ * Uses keySerde, containerIdSerde to serialize/deserialize (key, value) pairs before reading/writing
* into {@link MetadataStore}.
*
* Key and value serializer are different for yarn(uses CoordinatorStreamMessage) and standalone(uses native
@@ -73,15 +76,19 @@ public class TaskAssignmentManager {
* @param config the configuration required for setting up metadata store.
* @param metricsRegistry the registry for reporting metrics.
* @param keySerde the key serializer.
- * @param valueSerde the value serializer.
+ * @param containerIdSerde the value serializer.
*/
- public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) {
+ public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String> taskModeSerde) {
this.config = config;
this.keySerde = keySerde;
- this.valueSerde = valueSerde;
+ this.containerIdSerde = containerIdSerde;
+ this.taskModeSerde = taskModeSerde;
+
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
- this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
- this.metadataStore.init();
+ this.taskModeMappingMetadataStore = metadataStoreFactory.getMetadataStore(SetTaskModeMapping.TYPE, config, metricsRegistry);
+ this.taskContainerMappingMetadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
+ this.taskModeMappingMetadataStore.init();
+ this.taskContainerMappingMetadataStore.init();
}
/**
@@ -91,8 +98,8 @@ public class TaskAssignmentManager {
*/
public Map<String, String> readTaskAssignment() {
taskNameToContainerId.clear();
- metadataStore.all().forEach((taskName, valueBytes) -> {
- String containerId = valueSerde.fromBytes(valueBytes);
+ taskContainerMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
+ String containerId = containerIdSerde.fromBytes(valueBytes);
if (containerId != null) {
taskNameToContainerId.put(taskName, containerId);
}
@@ -106,20 +113,23 @@ public class TaskAssignmentManager {
*
* @param taskName the task name
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
+ * @param taskMode the mode of the task
*/
- public void writeTaskContainerMapping(String taskName, String containerId) {
+ public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
- LOG.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
+ LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
} else {
- LOG.debug("Task \"{}\" assigned to container {}", taskName, containerId);
+ LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
}
if (containerId == null) {
- metadataStore.delete(taskName);
+ taskContainerMappingMetadataStore.delete(taskName);
+ taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
- metadataStore.put(taskName, valueSerde.toBytes(containerId));
+ taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
+ taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
taskNameToContainerId.put(taskName, containerId);
}
}
@@ -131,12 +141,14 @@ public class TaskAssignmentManager {
*/
public void deleteTaskContainerMappings(Iterable<String> taskNames) {
for (String taskName : taskNames) {
- metadataStore.delete(taskName);
+ taskContainerMappingMetadataStore.delete(taskName);
+ taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
}
}
public void close() {
- metadataStore.close();
+ taskContainerMappingMetadataStore.close();
+ taskModeMappingMetadataStore.close();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index fee099e..ddde105 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -25,6 +25,7 @@ import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
@@ -57,6 +58,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
return String.valueOf(changelogMapping.getPartition());
+ } else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
+ SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(message);
+ return String.valueOf(setTaskModeMapping.getTaskMode());
} else {
throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
}
@@ -70,6 +74,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
} else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping(SOURCE, "", value);
return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
+ } else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
+ SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(SOURCE, "", value);
+ return messageSerde.toBytes(setTaskModeMapping.getMessageMap());
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
return messageSerde.toBytes(changelogMapping.getMessageMap());
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
new file mode 100644
index 0000000..3a8bbd6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream.messages;
+
+
+/**
+ * SetTaskModeMapping is a {@link CoordinatorStreamMessage} used internally
+ * by the Samza framework to persist the task-to-taskmode mappings.
+ *
+ * Structure of the message looks like:
+ *
+ * <pre>
+ * key => [1, "set-task-mode-assignment", $TaskName]
+ *
+ * message => {
+ * "host": "192.168.0.1",
+ * "source": "SamzaContainer",
+ * "username":"app",
+ * "timestamp": 1456177487325,
+ * "values": {
+ * "taskMode": "active"
+ * }
+ * }
+ * </pre>
+ * */
+public class SetTaskModeMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-task-mode-assignment";
+ public static final String TASKMODE_KEY = "taskMode";
+
+ /**
+ * SetTaskModeMapping is used to set the task to taskMode mapping information.
+ * @param message which holds the mapped information.
+ */
+ public SetTaskModeMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ * SetTaskModeMapping is used to set the task to taskMode mapping information.
+ * @param source the source of the message
+ * @param taskName the taskName which is used to persist the message
+ * @param taskMode the taskMode of the task
+ */
+ public SetTaskModeMapping(String source, String taskName, String taskMode) {
+ super(source);
+ setType(TYPE);
+ setKey(taskName);
+ putMessageValue(TASKMODE_KEY, taskMode.toString());
+ }
+
+ public String getTaskMode() {
+ return getMessageValue(TASKMODE_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 13a7d59..a812968 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -28,6 +28,7 @@ import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonProperty;
+
/**
* A mix-in Jackson class to convert Samza's TaskModel to/from JSON.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 15206e1..a247fb3 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,6 +29,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
@@ -81,10 +82,12 @@ public class SamzaObjectMapper {
module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
module.addSerializer(TaskName.class, new TaskNameSerializer());
+ module.addSerializer(TaskMode.class, new TaskModeSerializer());
module.addDeserializer(Partition.class, new PartitionDeserializer());
module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
module.addDeserializer(Config.class, new ConfigDeserializer());
+ module.addDeserializer(TaskMode.class, new TaskModeDeserializer());
// Setup mixins for data models.
mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
@@ -175,6 +178,27 @@ public class SamzaObjectMapper {
}
}
+ public static class TaskModeSerializer extends JsonSerializer<TaskMode> {
+ @Override
+ public void serialize(TaskMode taskMode, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+ jsonGenerator.writeObject(taskMode.toString());
+ }
+ }
+
+ public static class TaskModeDeserializer extends JsonDeserializer<TaskMode> {
+ @Override
+ public TaskMode deserialize(JsonParser jsonParser, DeserializationContext context)
+ throws IOException, JsonProcessingException {
+ ObjectCodec oc = jsonParser.getCodec();
+ JsonNode node = oc.readTree(jsonParser);
+ if (node == null || node.getTextValue().equals("")) {
+ return TaskMode.Active;
+ } else {
+ return TaskMode.valueOf(node.getTextValue());
+ }
+ }
+ }
+
public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5fb71f3..5c983db 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator
import java.util
import java.util.concurrent.atomic.AtomicReference
+
import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
@@ -32,9 +33,7 @@ import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system._
@@ -174,7 +173,7 @@ object JobModelManager extends Logging {
for (container <- jobModel.getContainers.values()) {
for (taskName <- container.getTasks.keySet) {
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId)
+ taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, TaskMode.Active)
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 60164b2..443262e 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.*;
import org.apache.samza.util.CoordinatorStreamUtil;
@@ -72,7 +73,7 @@ public class TestTaskAssignmentManager {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
- taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+ taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
@@ -89,7 +90,7 @@ public class TestTaskAssignmentManager {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
- taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+ taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 6048466..ae3d801 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -34,6 +34,7 @@ import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.system.StreamMetadataCache;
@@ -219,7 +220,7 @@ public class TestJobModelManager {
when(mockJobModel.getContainers()).thenReturn(containerMapping);
when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>());
- Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any());
+ Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any());
JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockGrouperMetadata);
@@ -232,9 +233,9 @@ public class TestJobModelManager {
// Verifications
Mockito.verify(mockJobModel, atLeast(1)).getContainers();
Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings((Iterable<String>) taskNames);
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id");
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id");
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id");
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id");
+ Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
+ Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
+ Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);
+ Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 0f90dd5..305efcd 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -22,6 +22,7 @@ package org.apache.samza.serializers.model;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
@@ -31,6 +32,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStreamPartition;
import org.codehaus.jackson.map.ObjectMapper;
@@ -75,6 +77,20 @@ public class TestSamzaObjectMapper {
}
@Test
+ public void testSerializeTaskModel() throws IOException {
+ TaskModel taskModel = new TaskModel(new TaskName("Standby Partition 0"), new HashSet<>(), new Partition(0),
+ TaskMode.Standby);
+ String serializedString = this.samzaObjectMapper.writeValueAsString(taskModel);
+ TaskModel deserializedTaskModel = this.samzaObjectMapper.readValue(serializedString, TaskModel.class);
+ assertEquals(taskModel, deserializedTaskModel);
+
+ String sampleSerializedString = "{\"task-name\":\"Partition 0\",\"system-stream-partitions\":[],\"changelog-partition\":0}";
+ deserializedTaskModel = this.samzaObjectMapper.readValue(sampleSerializedString, TaskModel.class);
+ taskModel = new TaskModel(new TaskName("Partition 0"), new HashSet<>(), new Partition(0), TaskMode.Active);
+ assertEquals(taskModel, deserializedTaskModel);
+ }
+
+ @Test
public void testDeserializeJobModel() throws IOException {
ObjectNode asJson = buildJobModelJson();
assertEquals(this.jobModel, deserializeFromObjectNode(asJson));
@@ -198,6 +214,7 @@ public class TestSamzaObjectMapper {
containerModel1TaskTestJson.put("task-name", "test");
containerModel1TaskTestJson.put("system-stream-partitions", containerModel1TaskTestSSPsJson);
containerModel1TaskTestJson.put("changelog-partition", 2);
+ containerModel1TaskTestJson.put("task-mode", "Active");
ObjectNode containerModel1TasksJson = objectMapper.createObjectNode();
containerModel1TasksJson.put("test", containerModel1TaskTestJson);