You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/12/08 19:08:18 UTC

[samza] branch master updated: SAMZA-2609: Data model changes to track lifecycle information of job coordinator (#1449)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1962946  SAMZA-2609: Data model changes to track lifecycle information of job coordinator (#1449)
1962946 is described below

commit 1962946d2cf6804fa92c15914daa8d48a14f5e09
Author: mynameborat <bh...@gmail.com>
AuthorDate: Tue Dec 8 11:08:08 2020 -0800

    SAMZA-2609: Data model changes to track lifecycle information of job coordinator (#1449)
    
    Description:
    With AM HA, the new AM will regenerate job model, do job planning and rewrite configurations and may have to spin up containers. The new rewritten configurations, job model can change from the existing metadata on which the containers are running. In order to handle this change of metadata, we need a way to identify lifecycle of the AM and store some information on the previous snapshot of the metadata.
    
    Changes:
    Epoch identifier of the AM attempt so that we can determine if the new attempt is part of the same lifecycle of AM or a new deployment lifecycle
    Metadata on the job model, configuration so, that we can use this information to decide the course of action if current metadata changed in comparison with the previous snapshot.
    Metrics capturing the changes in metadata
    Coordinator stream message updates for new metadata
    The code path as is not exercised actively and will be leveraged by the PRs to follow.
    
    API Changes:
    Added new Coordinator stream message type
    Introduced data model for job coordinator metadata internal to samza
---
 .../apache/samza/job/JobCoordinatorMetadata.java   | 112 ++++++++
 .../coordinator/JobCoordinatorMetadataManager.java | 289 +++++++++++++++++++++
 .../stream/CoordinatorStreamValueSerde.java        |   7 +
 .../messages/SetJobCoordinatorMetadataMessage.java |  56 ++++
 .../model/JsonJobCoordinatorMetadataMixIn.java     |  44 ++++
 .../samza/serializers/model/SamzaObjectMapper.java |   7 +
 .../TestJobCoordinatorMetadataManager.java         | 199 ++++++++++++++
 7 files changed, 714 insertions(+)

diff --git a/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java b/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
new file mode 100644
index 0000000..464941d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Unique and Reproducible
+ *    2. Remains unchanged across application attempts / deployments as long as the subset of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Unique and Reproducible
+ *    2. Remains unchanged across application attempts / deployments as long as the work assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond YARN and hence unstable.
+ *
+ */
+@InterfaceStability.Unstable
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String jobModelId) {
+    Preconditions.checkState(StringUtils.isNotBlank(epochId), "Epoch ID cannot be empty");
+    Preconditions.checkState(StringUtils.isNotBlank(configId), "Config ID cannot be empty");
+    Preconditions.checkState(StringUtils.isNotBlank(jobModelId), "Job Model ID cannot be empty");
+    this.configId = configId;
+    this.epochId = epochId;
+    this.jobModelId = jobModelId;
+  }
+
+  public String getConfigId() {
+    return configId;
+  }
+
+  public String getEpochId() {
+    return this.epochId;
+  }
+
+  public String getJobModelId() {
+    return jobModelId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobCoordinatorMetadata)) {
+      return false;
+    }
+    JobCoordinatorMetadata metadata = (JobCoordinatorMetadata) o;
+    return Objects.equals(configId, metadata.configId) && Objects.equals(epochId, metadata.epochId)
+        && Objects.equals(jobModelId, metadata.jobModelId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(configId, epochId, jobModelId);
+  }
+
+  @Override
+  public String toString() {
+    return "JobCoordinatorMetadata{" + "configId='" + configId + '\'' + ", epochId='" + epochId + '\''
+        + ", jobModelId='" + jobModelId + '\'' + '}';
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
new file mode 100644
index 0000000..c5d72f5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to {@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = "applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = "job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final ClusterType clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry) {
+    Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+
+    applicationAttemptCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, JOB_MODEL_CHANGED, 0);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the {@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          .hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      LOG.error("Failed to generate metadata for the current attempt due to ", e);
+      throw new RuntimeException("Failed to generate the metadata for the current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if (!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if (!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if (!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.name().equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          LOG.error("Failed to read job coordinator metadata due to ", e);
+        }
+      }
+    }
+
+    LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", clusterType, metadata);
+    return metadata;
+  }
+
+  /**
+   * Persist the {@link JobCoordinatorMetadata} in metadata store. The job coordinator metadata is associated
+   * with the cluster type specified at the creation of the manager.
+   *
+   * @param metadata metadata to be persisted
+   *
+   * @throws SamzaException in case of exception encountered during the writes to underlying metadata store
+   */
+  public void writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
+    Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be null");
+
+    try {
+      String metadataValueString = metadataMapper.writeValueAsString(metadata);
+      metadataStore.put(clusterType.name(), valueSerde.toBytes(metadataValueString));
+      LOG.info("Successfully written job coordinator metadata: {} for cluster {}.", metadata, clusterType);
+    } catch (Exception e) {
+      LOG.error("Failed to write the job coordinator metadata to metadata store due to ", e);
+      throw new SamzaException("Failed to write the job coordinator metadata.", e);
+    }
+  }
+
+  @VisibleForTesting
+  Counter getApplicationAttemptCount() {
+    return applicationAttemptCount;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
+    return jobModelChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
+    return configChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getNewDeployment() {
+    return newDeployment;
+  }
+
+  @VisibleForTesting
+  String getEnvProperty(String propertyName) {
+    return System.getenv(propertyName);
+  }
+
+  /**
+   * Generate the epoch id using the execution container id that is passed through system environment. This isn't ideal
+   * way of generating this ID and we will need some contract between the underlying cluster manager and samza engine
+   * around what the epoch ID should be like and what is needed to generate is across different cluster offerings.
+   * Due to unknowns defined above, we leave it as is and keep it simple for now. It is favorable to keep it this way
+   * instead of introducing a loosely defined interface/API and marking it unstable.
+   *
+   * The properties of the epoch identifier are as follows
+   *  1. Unique across applications in the cluster
+   *  2. Remains unchanged within a single deployment lifecycle
+   *  3. Remains unchanged across application attempt within a single deployment lifecycle
+   *  4. Changes across deployment lifecycle
+   *
+   *  Note: The above properties is something we want keep intact when extracting this into a well defined interface
+   *  or contract for YARN AM HA to work.
+   *  The format and property used to generate ID is specific to YARN and the specific format of the container name
+   *  is a public contract by YARN which is likely to remain backward compatible.
+   *
+   * @return an identifier associated with the job coordinator satisfying the above properties
+   */
+  private String fetchEpochIdForJobCoordinator() {
+    String[] containerIdParts = getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+  }
+
+  /**
+   * A helper class to generate hash for the {@link Config} based on with a subset of configuration.
+   * The subset of configuration used are configurations that prefix match the allowed prefixes.
+   */
+  private static class ConfigHashFunnel implements Funnel<Config> {
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigHashFunnel.class);
+    // using sorted set to ensure the hash computation on configurations is reproducible and deterministic
+    private static final SortedSet<String> ALLOWED_PREFIXES = ImmutableSortedSet.of("job.autosizing");
+    @Override
+    public void funnel(Config from, PrimitiveSink into) {
+      SortedMap<String, String> map = new TreeMap<>();
+
+      ALLOWED_PREFIXES.forEach(prefix -> map.putAll(from.subset(prefix, false)));
+      LOG.info("Using the config {} to generate hash", map);
+      map.forEach((key, value) -> {
+        into.putUnencodedChars(key);
+        into.putUnencodedChars(value);
+      });
+    }
+  }
+
+  /**
+   * Type of the cluster deployment associated with the {@link JobCoordinatorMetadataManager}
+   */
+  public enum ClusterType {
+    YARN
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index 86983f1..c2df96c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -24,6 +24,7 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -73,6 +74,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetTaskPartitionMapping.TYPE)) {
       SetTaskPartitionMapping setTaskPartitionMapping = new SetTaskPartitionMapping(message);
       return setTaskPartitionMapping.getTaskNames();
+    } else if (type.equalsIgnoreCase(SetJobCoordinatorMetadataMessage.TYPE)) {
+      SetJobCoordinatorMetadataMessage jobCoordinatorMetadataMessage = new SetJobCoordinatorMetadataMessage(message);
+      return jobCoordinatorMetadataMessage.getJobCoordinatorMetadata();
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
@@ -102,6 +106,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetTaskPartitionMapping.TYPE)) {
       SetTaskPartitionMapping setTaskPartitionMapping = new SetTaskPartitionMapping(SOURCE, "", value);
       return messageSerde.toBytes(setTaskPartitionMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetJobCoordinatorMetadataMessage.TYPE)) {
+      SetJobCoordinatorMetadataMessage jobCoordinatorMetadataMessage = new SetJobCoordinatorMetadataMessage(SOURCE, "", value);
+      return messageSerde.toBytes(jobCoordinatorMetadataMessage.getMessageMap());
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java
new file mode 100644
index 0000000..55c37f4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetJobCoordinatorMetadataMessage is used internally by the Samza framework to
+ * persist {@link org.apache.samza.job.JobCoordinatorMetadata} in coordinator stream.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: meta-key
+ *     Type: set-job-coordinator-metadata
+ *     Source: "SamzaContainer"
+ *     MessageMap:
+ *     {
+ *         "epoch-id": epoch identifier of the job coordinator,
+ *         "job-model-id": identifier associated with the snapshot of job model used by the job coordinator,
+ *         "config-id": identifier associated with the snapshot of the configuration used by the job coordinator
+ *     }
+ * }
+ * */
+public class SetJobCoordinatorMetadataMessage extends CoordinatorStreamMessage {
+  private static final String META_KEY = "meta-key";
+  public static final String TYPE = "set-job-coordinator-metadata";
+
+  public SetJobCoordinatorMetadataMessage(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public SetJobCoordinatorMetadataMessage(String source, String clusterType, String metaMessage) {
+    super(source);
+    setType(TYPE);
+    setKey(clusterType);
+    putMessageValue(META_KEY, metaMessage);
+  }
+
+  public String getJobCoordinatorMetadata() {
+    return getMessageValue(META_KEY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java
new file mode 100644
index 0000000..5aab8c1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.JobCoordinatorMetadata} to/from JSON
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonJobCoordinatorMetadataMixIn {
+
+  @JsonCreator
+  public JsonJobCoordinatorMetadataMixIn(@JsonProperty("epoch-id") String epochId,
+      @JsonProperty("config-id") String configId, @JsonProperty("job-model-id") String jobModelId) {
+  }
+
+  @JsonProperty("epoch-id")
+  abstract String getEpochId();
+
+  @JsonProperty("config-id")
+  abstract String getConfigId();
+
+  @JsonProperty("job-model-id")
+  abstract String getJobModelId();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index db147f0..3a0205d 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -27,6 +27,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -137,6 +138,12 @@ public class SamzaObjectMapper {
     mapper.getSerializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
 
+    // Register mixins for job coordinator metadata model
+    mapper.getSerializationConfig()
+        .addMixInAnnotations(JobCoordinatorMetadata.class, JsonJobCoordinatorMetadataMixIn.class);
+    mapper.getDeserializationConfig()
+        .addMixInAnnotations(JobCoordinatorMetadata.class, JsonJobCoordinatorMetadataMixIn.class);
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
new file mode 100644
index 0000000..bd177cc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.ClusterType;
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * A test class for {@link JobCoordinatorMetadataManager}
+ */
+public class TestJobCoordinatorMetadataManager {
+  private static final String OLD_CONFIG_ID = "1";
+  private static final String OLD_JOB_MODEL_ID = "1";
+  private static final String OLD_EPOCH_ID = "1606797336059" + CONTAINER_ID_DELIMITER + "0010";
+  private static final String OLD_CONTAINER_ID = "CONTAINER" + CONTAINER_ID_DELIMITER + OLD_EPOCH_ID +
+      CONTAINER_ID_DELIMITER + "00002";
+
+  private static final String NEW_CONFIG_ID = "2";
+  private static final String NEW_JOB_MODEL_ID = "2";
+  private static final String NEW_EPOCH_ID = "1606797336059" + CONTAINER_ID_DELIMITER + "0011";
+
+  private static final Config OLD_CONFIG = new MapConfig(
+      ImmutableMap.of(
+          "job.autosizing.enabled", "true",
+          "job.autosizing.cpu.core", "16"));
+
+  private static final Config NEW_CONFIG = new MapConfig(
+      ImmutableMap.of(
+          "job.autosizing.enabled", "true",
+          "job.autosizing.cpu.core", "24"));
+
+  private static final Config COORDINATOR_STORE_CONFIG =
+      new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
+
+  private JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private Map<String, ContainerModel> containerModelMap;
+  private MetadataStore metadataStore;
+
+  @Before
+  public void setup() {
+    Map<TaskName, TaskModel> tasksForContainer1 = ImmutableMap.of(
+        new TaskName("t1"), new TaskModel(new TaskName("t1"), ImmutableSet.of(), new Partition(0)),
+        new TaskName("t2"), new TaskModel(new TaskName("t2"), ImmutableSet.of(), new Partition(1)));
+    Map<TaskName, TaskModel> tasksForContainer2 = ImmutableMap.of(
+        new TaskName("t3"), new TaskModel(new TaskName("t3"), ImmutableSet.of(), new Partition(2)),
+        new TaskName("t4"), new TaskModel(new TaskName("t4"), ImmutableSet.of(), new Partition(3)),
+        new TaskName("t5"), new TaskModel(new TaskName("t5"), ImmutableSet.of(), new Partition(4)));
+    ContainerModel containerModel1 = new ContainerModel("0", tasksForContainer1);
+    ContainerModel containerModel2 = new ContainerModel("1", tasksForContainer2);
+    containerModelMap = ImmutableMap.of("0", containerModel1, "1", containerModel2);
+    CoordinatorStreamStoreTestUtil mockCoordinatorStreamStore =
+        new CoordinatorStreamStoreTestUtil(COORDINATOR_STORE_CONFIG);
+    metadataStore = spy(new NamespaceAwareCoordinatorStreamStore(
+        mockCoordinatorStreamStore.getCoordinatorStreamStore(), SetJobCoordinatorMetadataMessage.TYPE));
+    jobCoordinatorMetadataManager = spy(new JobCoordinatorMetadataManager(metadataStore,
+        ClusterType.YARN, new MetricsRegistryMap()));
+  }
+
+  @Test
+  public void testCheckForMetadataChanges() {
+    JobCoordinatorMetadata previousMetadata = new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadataWithDifferentEpochId =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+
+    boolean metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentEpochId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+        jobCoordinatorMetadataManager.getNewDeployment().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithDifferentConfigId =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, OLD_JOB_MODEL_ID);
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentConfigId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("Config across application attempts should be 1", 1,
+        jobCoordinatorMetadataManager.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithDifferentJobModelId =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, NEW_JOB_MODEL_ID);
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentJobModelId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("Job model changed across application attempts should be 1", 1,
+        jobCoordinatorMetadataManager.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithNoChange =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    assertEquals("Application attempt count should be 0", 0,
+        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
+    assertFalse("Metadata check should return false", metadataChanged);
+    assertEquals("Application attempt count should be 1", 1,
+        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+  }
+
+  @Test
+  public void testGenerateJobCoordinatorMetadataForRepeatability() {
+    when(jobCoordinatorMetadataManager.getEnvProperty(CONTAINER_ID_PROPERTY))
+        .thenReturn(OLD_CONTAINER_ID);
+    JobCoordinatorMetadata expectedMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+
+    assertEquals("Mismatch in epoch identifier.", OLD_EPOCH_ID, expectedMetadata.getEpochId());
+
+    JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+    assertEquals("Expected repeatable job coordinator metadata", expectedMetadata, actualMetadata);
+  }
+
+  @Test
+  public void testGenerateJobCoordinatorMetadataWithConfigChanges() {
+    when(jobCoordinatorMetadataManager.getEnvProperty(CONTAINER_ID_PROPERTY))
+        .thenReturn(OLD_CONTAINER_ID);
+    JobCoordinatorMetadata expectedMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+
+    Map<String, String> additionalConfig = new HashMap<>();
+    additionalConfig.put("yarn.am.high-availability.enabled", "true");
+
+    additionalConfig.putAll(OLD_CONFIG);
+    Config modifiedConfig = new MapConfig(additionalConfig);
+    JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(modifiedConfig, containerModelMap), modifiedConfig);
+    assertEquals("Job coordinator metadata should remain the same", expectedMetadata, actualMetadata);
+  }
+
+  @Test
+  public void testReadWriteJobCoordinatorMetadata() {
+    JobCoordinatorMetadata jobCoordinatorMetadata =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(jobCoordinatorMetadata);
+
+    JobCoordinatorMetadata actualJobCoordinatorMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    assertEquals("Mismatch in job coordinator metadata", jobCoordinatorMetadata, actualJobCoordinatorMetadata);
+  }
+
+  @Test (expected = NullPointerException.class)
+  public void testWriteNullJobCoordinatorMetadataShouldThrowException() {
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(null);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testWriteJobCoordinatorMetadataBubblesException() {
+    doThrow(new RuntimeException("failed to write to coordinator stream"))
+        .when(metadataStore).put(anyString(), any());
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(mock(JobCoordinatorMetadata.class));
+  }
+}