You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/01/17 18:18:42 UTC

samza git commit: SAMZA-2056: Adding a TaskMode in the TaskModel

Repository: samza
Updated Branches:
  refs/heads/master 5ff7f239f -> 0cb8cdbf0


SAMZA-2056: Adding a TaskMode in the TaskModel

This PR adds a TaskMode (an enum) to the TaskModel.
This assignment is persisted to the metastore using a SetTaskModeMapping type.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Shanthoosh Venkatraman <sv...@linkedin.com>

Closes #871 from rmatharu/taskmode


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0cb8cdbf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0cb8cdbf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0cb8cdbf

Branch: refs/heads/master
Commit: 0cb8cdbf08b2ed30e0a1c4a1b4fef7e5aaf76bf6
Parents: 5ff7f23
Author: Ray Matharu <rm...@linkedin.com>
Authored: Thu Jan 17 10:18:39 2019 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Jan 17 10:18:39 2019 -0800

----------------------------------------------------------------------
 .../org/apache/samza/job/model/TaskMode.java    | 38 +++++++++++
 .../org/apache/samza/job/model/TaskModel.java   | 34 +++++++++-
 .../grouper/task/TaskAssignmentManager.java     | 50 ++++++++------
 .../stream/CoordinatorStreamValueSerde.java     |  7 ++
 .../stream/messages/SetTaskModeMapping.java     | 70 ++++++++++++++++++++
 .../serializers/model/JsonTaskModelMixIn.java   |  1 +
 .../serializers/model/SamzaObjectMapper.java    | 24 +++++++
 .../samza/coordinator/JobModelManager.scala     |  7 +-
 .../grouper/task/TestTaskAssignmentManager.java |  5 +-
 .../samza/coordinator/TestJobModelManager.java  | 11 +--
 .../model/TestSamzaObjectMapper.java            | 17 +++++
 11 files changed, 232 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
new file mode 100644
index 0000000..c98b94d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskMode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.model;
+
+/**
+ * This defines the logical mode of a taskInstance.
+ * Active is the defacto mode for a task, i.e., tasks processing input, reading/writing state, producing output, etc.
+ * Standby is the mode for tasks, that maintain warmed-up KV state by reading from its changelog.
+ */
+public enum TaskMode {
+  Active("Active"), Standby("Standby");
+
+  private final String mode;
+
+  private TaskMode(String mode) {
+    this.mode = mode;
+  }
+
+  public String toString() {
+    return this.mode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
index 36917cf..c29a714 100644
--- a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -35,11 +35,31 @@ public class TaskModel implements Comparable<TaskModel> {
   private final TaskName taskName;
   private final Set<SystemStreamPartition> systemStreamPartitions;
   private final Partition changelogPartition;
+  private final TaskMode taskMode;
 
-  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+  /**
+   *  Create a TaskModel for an active task with the given taskName, SSPs, and changelogPartition.
+   * @param taskName The desired taskName
+   * @param systemStreamPartitions SSPs assigned to this task.
+   * @param changelogPartition The changelog SSP for this task.
+   * @param taskMode The mode of the task
+   */
+  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition, TaskMode taskMode) {
     this.taskName = taskName;
     this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
     this.changelogPartition = changelogPartition;
+    this.taskMode = taskMode;
+  }
+
+
+  /**
+   *  Create a TaskModel for an active task with the given taskName, SSPs, and changelogPartition.
+   * @param taskName The desired taskName
+   * @param systemStreamPartitions SSPs assigned to this task.
+   * @param changelogPartition The changelog SSP for this task.
+   */
+  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+    this(taskName, systemStreamPartitions, changelogPartition, TaskMode.Active);
   }
 
   /**
@@ -66,6 +86,10 @@ public class TaskModel implements Comparable<TaskModel> {
     return changelogPartition;
   }
 
+  public TaskMode getTaskMode() {
+    return this.taskMode;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -87,6 +111,10 @@ public class TaskModel implements Comparable<TaskModel> {
       return false;
     }
 
+    if (!taskMode.equals(taskModel.taskMode)) {
+      return false;
+    }
+
     return true;
   }
 
@@ -95,12 +123,14 @@ public class TaskModel implements Comparable<TaskModel> {
     int result = taskName.hashCode();
     result = 31 * result + systemStreamPartitions.hashCode();
     result = 31 * result + changelogPartition.hashCode();
+    result = 31 * result + taskMode.hashCode();
     return result;
   }
 
   @Override
   public String toString() {
-    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions
+        + ", changeLogPartition=" + changelogPartition + ", taskMode=" + this.taskMode + "]";
   }
 
   public int compareTo(TaskModel other) {

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index b6e946c..e151384 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -21,12 +21,13 @@ package org.apache.samza.container.grouper.task;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -45,9 +46,11 @@ public class TaskAssignmentManager {
   private final Config config;
   private final Map<String, String> taskNameToContainerId = new HashMap<>();
   private final Serde<String> keySerde;
-  private final Serde<String> valueSerde;
+  private final Serde<String> containerIdSerde;
+  private final Serde<String> taskModeSerde;
 
-  private MetadataStore metadataStore;
+  private MetadataStore taskContainerMappingMetadataStore;
+  private MetadataStore taskModeMappingMetadataStore;
 
   /**
    * Builds the TaskAssignmentManager based upon {@link Config} and {@link MetricsRegistry}.
@@ -59,13 +62,13 @@ public class TaskAssignmentManager {
    */
   public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry) {
     this(config, metricsRegistry, new CoordinatorStreamKeySerde(SetTaskContainerMapping.TYPE),
-         new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE));
+         new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE), new CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE));
   }
 
   /**
    * Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
    *
-   * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs before reading/writing
+   * Uses keySerde, containerIdSerde to serialize/deserialize (key, value) pairs before reading/writing
    * into {@link MetadataStore}.
    *
    * Key and value serializer are different for yarn(uses CoordinatorStreamMessage) and standalone(uses native
@@ -73,15 +76,19 @@ public class TaskAssignmentManager {
    * @param config the configuration required for setting up metadata store.
    * @param metricsRegistry the registry for reporting metrics.
    * @param keySerde the key serializer.
-   * @param valueSerde the value serializer.
+   * @param containerIdSerde the value serializer.
    */
-  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) {
+  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String> taskModeSerde) {
     this.config = config;
     this.keySerde = keySerde;
-    this.valueSerde = valueSerde;
+    this.containerIdSerde = containerIdSerde;
+    this.taskModeSerde = taskModeSerde;
+
     MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
-    this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
-    this.metadataStore.init();
+    this.taskModeMappingMetadataStore = metadataStoreFactory.getMetadataStore(SetTaskModeMapping.TYPE, config, metricsRegistry);
+    this.taskContainerMappingMetadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
+    this.taskModeMappingMetadataStore.init();
+    this.taskContainerMappingMetadataStore.init();
   }
 
   /**
@@ -91,8 +98,8 @@ public class TaskAssignmentManager {
    */
   public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
-    metadataStore.all().forEach((taskName, valueBytes) -> {
-        String containerId = valueSerde.fromBytes(valueBytes);
+    taskContainerMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
+        String containerId = containerIdSerde.fromBytes(valueBytes);
         if (containerId != null) {
           taskNameToContainerId.put(taskName, containerId);
         }
@@ -106,20 +113,23 @@ public class TaskAssignmentManager {
    *
    * @param taskName    the task name
    * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
+   * @param taskMode the mode of the task
    */
-  public void writeTaskContainerMapping(String taskName, String containerId) {
+  public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
     String existingContainerId = taskNameToContainerId.get(taskName);
     if (existingContainerId != null && !existingContainerId.equals(containerId)) {
-      LOG.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
+      LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
     } else {
-      LOG.debug("Task \"{}\" assigned to container {}", taskName, containerId);
+      LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
     }
 
     if (containerId == null) {
-      metadataStore.delete(taskName);
+      taskContainerMappingMetadataStore.delete(taskName);
+      taskModeMappingMetadataStore.delete(taskName);
       taskNameToContainerId.remove(taskName);
     } else {
-      metadataStore.put(taskName, valueSerde.toBytes(containerId));
+      taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
+      taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
       taskNameToContainerId.put(taskName, containerId);
     }
   }
@@ -131,12 +141,14 @@ public class TaskAssignmentManager {
    */
   public void deleteTaskContainerMappings(Iterable<String> taskNames) {
     for (String taskName : taskNames) {
-      metadataStore.delete(taskName);
+      taskContainerMappingMetadataStore.delete(taskName);
+      taskModeMappingMetadataStore.delete(taskName);
       taskNameToContainerId.remove(taskName);
     }
   }
 
   public void close() {
-    metadataStore.close();
+    taskContainerMappingMetadataStore.close();
+    taskModeMappingMetadataStore.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index fee099e..ddde105 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -25,6 +25,7 @@ import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 
@@ -57,6 +58,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
       return String.valueOf(changelogMapping.getPartition());
+    } else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
+      SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(message);
+      return String.valueOf(setTaskModeMapping.getTaskMode());
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
@@ -70,6 +74,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
       SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping(SOURCE, "", value);
       return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
+      SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(SOURCE, "", value);
+      return messageSerde.toBytes(setTaskModeMapping.getMessageMap());
     } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
       return messageSerde.toBytes(changelogMapping.getMessageMap());

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
new file mode 100644
index 0000000..3a8bbd6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskModeMapping.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream.messages;
+
+
+/**
+ * SetTaskModeMapping is a {@link CoordinatorStreamMessage} used internally
+ * by the Samza framework to persist the task-to-taskmode mappings.
+ *
+ * Structure of the message looks like:
+ *
+ * <pre>
+ * key =&gt; [1, "set-task-mode-assignment", $TaskName]
+ *
+ * message =&gt; {
+ *     "host": "192.168.0.1",
+ *     "source": "SamzaContainer",
+ *     "username":"app",
+ *     "timestamp": 1456177487325,
+ *     "values": {
+ *         "taskMode": "active"
+ *     }
+ * }
+ * </pre>
+ * */
+public class SetTaskModeMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-task-mode-assignment";
+  public static final String TASKMODE_KEY = "taskMode";
+
+  /**
+   * SetTaskModeMapping is used to set the task to taskMode mapping information.
+   * @param message which holds the mapped information.
+   */
+  public SetTaskModeMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SetTaskModeMapping is used to set the task to taskMode mapping information.
+   * @param source              the source of the message
+   * @param taskName                 the taskName which is used to persist the message
+   * @param taskMode            the taskMode of the task
+   */
+  public SetTaskModeMapping(String source, String taskName, String taskMode) {
+    super(source);
+    setType(TYPE);
+    setKey(taskName);
+    putMessageValue(TASKMODE_KEY, taskMode.toString());
+  }
+
+  public String getTaskMode() {
+    return getMessageValue(TASKMODE_KEY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 13a7d59..a812968 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -28,6 +28,7 @@ import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+
 /**
  * A mix-in Jackson class to convert Samza's TaskModel to/from JSON.
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 15206e1..a247fb3 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,6 +29,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
@@ -81,10 +82,12 @@ public class SamzaObjectMapper {
     module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
     module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
     module.addSerializer(TaskName.class, new TaskNameSerializer());
+    module.addSerializer(TaskMode.class, new TaskModeSerializer());
     module.addDeserializer(Partition.class, new PartitionDeserializer());
     module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
     module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
     module.addDeserializer(Config.class, new ConfigDeserializer());
+    module.addDeserializer(TaskMode.class, new TaskModeDeserializer());
 
     // Setup mixins for data models.
     mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
@@ -175,6 +178,27 @@ public class SamzaObjectMapper {
     }
   }
 
+  public static class TaskModeSerializer extends JsonSerializer<TaskMode> {
+    @Override
+    public void serialize(TaskMode taskMode, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+      jsonGenerator.writeObject(taskMode.toString());
+    }
+  }
+
+  public static class TaskModeDeserializer extends JsonDeserializer<TaskMode> {
+    @Override
+    public TaskMode deserialize(JsonParser jsonParser, DeserializationContext context)
+        throws IOException, JsonProcessingException {
+      ObjectCodec oc = jsonParser.getCodec();
+      JsonNode node = oc.readTree(jsonParser);
+      if (node == null || node.getTextValue().equals("")) {
+        return TaskMode.Active;
+      } else {
+        return TaskMode.valueOf(node.getTextValue());
+      }
+    }
+  }
+
   public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
     @Override
     public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5fb71f3..5c983db 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator
 
 import java.util
 import java.util.concurrent.atomic.AtomicReference
+
 import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
@@ -32,9 +33,7 @@ import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system._
@@ -174,7 +173,7 @@ object JobModelManager extends Logging {
 
     for (container <- jobModel.getContainers.values()) {
       for (taskName <- container.getTasks.keySet) {
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId)
+        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, TaskMode.Active)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 60164b2..443262e 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.*;
 import org.apache.samza.util.CoordinatorStreamUtil;
@@ -72,7 +73,7 @@ public class TestTaskAssignmentManager {
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
-      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
     }
 
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
@@ -89,7 +90,7 @@ public class TestTaskAssignmentManager {
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
-      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
     }
 
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 6048466..ae3d801 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -34,6 +34,7 @@ import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.system.StreamMetadataCache;
@@ -219,7 +220,7 @@ public class TestJobModelManager {
 
     when(mockJobModel.getContainers()).thenReturn(containerMapping);
     when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>());
-    Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any());
+    Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any());
 
     JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockGrouperMetadata);
 
@@ -232,9 +233,9 @@ public class TestJobModelManager {
     // Verifications
     Mockito.verify(mockJobModel, atLeast(1)).getContainers();
     Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings((Iterable<String>) taskNames);
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id");
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id");
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id");
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id");
+    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
+    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
+    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);
+    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/0cb8cdbf/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 0f90dd5..305efcd 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -22,6 +22,7 @@ package org.apache.samza.serializers.model;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
@@ -31,6 +32,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStreamPartition;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -75,6 +77,20 @@ public class TestSamzaObjectMapper {
   }
 
   @Test
+  public void testSerializeTaskModel() throws IOException {
+    TaskModel taskModel = new TaskModel(new TaskName("Standby Partition 0"), new HashSet<>(), new Partition(0),
+        TaskMode.Standby);
+    String serializedString = this.samzaObjectMapper.writeValueAsString(taskModel);
+    TaskModel deserializedTaskModel = this.samzaObjectMapper.readValue(serializedString, TaskModel.class);
+    assertEquals(taskModel, deserializedTaskModel);
+
+    String sampleSerializedString = "{\"task-name\":\"Partition 0\",\"system-stream-partitions\":[],\"changelog-partition\":0}";
+    deserializedTaskModel = this.samzaObjectMapper.readValue(sampleSerializedString, TaskModel.class);
+    taskModel = new TaskModel(new TaskName("Partition 0"), new HashSet<>(), new Partition(0), TaskMode.Active);
+    assertEquals(taskModel, deserializedTaskModel);
+  }
+
+  @Test
   public void testDeserializeJobModel() throws IOException {
     ObjectNode asJson = buildJobModelJson();
     assertEquals(this.jobModel, deserializeFromObjectNode(asJson));
@@ -198,6 +214,7 @@ public class TestSamzaObjectMapper {
     containerModel1TaskTestJson.put("task-name", "test");
     containerModel1TaskTestJson.put("system-stream-partitions", containerModel1TaskTestSSPsJson);
     containerModel1TaskTestJson.put("changelog-partition", 2);
+    containerModel1TaskTestJson.put("task-mode", "Active");
 
     ObjectNode containerModel1TasksJson = objectMapper.createObjectNode();
     containerModel1TasksJson.put("test", containerModel1TaskTestJson);