You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/12/22 21:20:07 UTC

[samza] branch master updated: API Changes to make Standby Container Allocation Rack Aware (#1453)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 708a6de  API Changes to make Standby Container Allocation Rack Aware (#1453)
708a6de is described below

commit 708a6dec8905d7671d3bf70d9124472e52d30cdc
Author: Pawas Chhokra <pc...@linkedin.com>
AuthorDate: Tue Dec 22 13:19:59 2020 -0800

    API Changes to make Standby Container Allocation Rack Aware (#1453)
    
    Feature: The aim of this feature is to make all standby container requests rack aware such that all active containers and their corresponding standby containers are always on different racks. This helps with decreased downtime of applications during rack failures. One of the requirements of this feature is that the value of job.standbytasks.replication.factor is at max 2 for the rack awareness functionality to be honored.
    
    Changes:
    
    Added a new interface called FaultDomainManager which implements the YarnFaultDomainManager class for Yarn. This class takes care of the retrieval of node to rack information from Yarn.
    Also defined what FaultDomain and FaultDomainType means.
    Added config to enable making fault domain aware requests.
    Added metrics to track fault domain aware requests.
    
    API Changes: Added a new FaultDomainManager interface.
---
 .../versioned/jobs/samza-configurations.md         |   2 +
 .../apache/samza/clustermanager/FaultDomain.java   |  85 +++++++++
 .../samza/clustermanager/FaultDomainManager.java   |  58 +++++++
 .../clustermanager/FaultDomainManagerFactory.java  |  32 ++++
 .../samza/clustermanager/FaultDomainType.java      |  26 +++
 .../clustermanager/SamzaApplicationState.java      |  20 +++
 .../samza/clustermanager/SamzaResourceRequest.java |  36 +++-
 .../apache/samza/config/ClusterManagerConfig.java  |  17 ++
 .../metrics/ContainerProcessManagerMetrics.scala   |   6 +
 .../clustermanager/MockFaultDomainManager.java     |  60 +++++++
 .../MockFaultDomainManagerFactory.java             |  31 ++++
 .../samza/job/yarn/YarnFaultDomainManager.java     | 131 ++++++++++++++
 .../job/yarn/YarnFaultDomainManagerFactory.java    |  34 ++++
 .../samza/job/yarn/TestYarnFaultDomainManager.java | 190 +++++++++++++++++++++
 14 files changed, 726 insertions(+), 2 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1caf88e..96e994a 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -338,6 +338,8 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
 |cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|
+|cluster-manager.fault-domain-aware.standby.enabled|false|This property when set to true, makes standby container allocation fault domain aware. Along with this, you will also need to configure `cluster-manager.fault-domain-manager.factory`.|
+|cluster-manager.fault-domain-manager.factory|`org.apache.samza.`<br>`job.yarn.`<br>`YarnFaultDomainManagerFactory`|This is the fully qualified name of the Java class that determines which factory to use based on the cluster manager, while making standby container allocations fault domain aware. This configuration applies only when `cluster-manager.fault-domain-aware.standby.enabled` is set to true.|
 |task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has one special container, the [ApplicationMaster](../yarn/application-master.html) (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
new file mode 100644
index 0000000..a5ebdd4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {
+
+  private final FaultDomainType type;
+  private final String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    Preconditions.checkNotNull(type, "Fault domain type (ex:rack) cannot be null.");
+    Preconditions.checkNotNull(id, "Fault domain ID (rack ID) cannot be null.");
+
+    this.type = type;
+    this.id = id;
+  }
+
+  /**
+   * Gets the type of fault domain, for example: rack.
+   * @return Type of fault domain
+   */
+  public FaultDomainType getType() {
+    return type;
+  }
+
+  /**
+   * Gets the id of the fault domain, for example: rack ID.
+   * @return fault domain ID
+   */
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return " {" +
+            "type = " + type +
+            ", id = " + id +
+            "} ";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FaultDomain that = (FaultDomain) o;
+    return Objects.equal(type, that.type) && Objects.equal(id, that.id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(type, id);
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
new file mode 100644
index 0000000..3f4e0a9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster, for all hosts that are healthy, up and running.
+   * This set might not be up to date with the current state of the cluster, as its freshness is an implementation detail.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular host resides on based on the internal cache.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  Set<FaultDomain> getFaultDomainsForHost(String host);
+
+  /**
+   * This method returns true if the fault domains on which these two hosts reside are exactly the same, false otherwise.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean hasSameFaultDomains(String host1, String host2);
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
new file mode 100644
index 0000000..cef0148
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * A factory to build a {@link FaultDomainManager}.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManagerFactory {
+
+  FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
new file mode 100644
index 0000000..ca2fdd1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * This enum defines the type of fault domain used depending on the environment they are in.
+ */
+public enum FaultDomainType {
+    RACK
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 930d366..f442094 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -181,6 +181,26 @@ public class SamzaApplicationState {
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a job.
+   */
+  public final AtomicInteger faultDomainAwareContainerRequests = new AtomicInteger(0);
+
+  /**
+   * Number of fault domain aware containers started for a job.
+   */
+  public final AtomicInteger faultDomainAwareContainersStarted = new AtomicInteger(0);
+
+  /**
+   * Number of expired fault domain aware container requests made for a job.
+   */
+  public final AtomicInteger expiredFaultDomainAwareContainerRequests = new AtomicInteger(0);
+
+  /**
+   * Number of failed fault domain aware container allocations for a job.
+   */
+  public final AtomicInteger failedFaultDomainAwareContainerAllocations = new AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 5ca8b24..be804c9 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -19,7 +19,11 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.time.Instant;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +53,10 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
    */
   private final String preferredHost;
   /**
+   * The set of fault domains on which the resource must be allocated.
+   */
+  private final Set<FaultDomain> faultDomains;
+  /**
    * A request is identified by an unique identifier.
    */
   private final String requestId;
@@ -63,7 +71,11 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), ImmutableSet.of());
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Set<FaultDomain> faultDomains) {
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), faultDomains);
   }
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Instant requestTimestamp) {
@@ -73,7 +85,22 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
     this.requestTimestamp = requestTimestamp;
-    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}", this.processorId, this.preferredHost, this.requestTimestamp, this.requestId);
+    this.faultDomains = new HashSet<>();
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}, and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, this.requestId, this.faultDomains);
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Instant requestTimestamp, Set<FaultDomain> faultDomains) {
+    Preconditions.checkNotNull(faultDomains, "Set of fault domains should not be null.");
+    this.numCores = numCores;
+    this.memoryMB = memoryMB;
+    this.preferredHost = preferredHost;
+    this.requestId = UUID.randomUUID().toString();
+    this.processorId = processorId;
+    this.requestTimestamp = requestTimestamp;
+    this.faultDomains = faultDomains;
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {} and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, this.requestId, this.faultDomains.toString());
   }
 
   public String getProcessorId() {
@@ -96,6 +123,10 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
     return preferredHost;
   }
 
+  public Set<FaultDomain> getFaultDomains() {
+    return faultDomains;
+  }
+
   public int getMemoryMB() {
     return memoryMB;
   }
@@ -109,6 +140,7 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + faultDomains.toString() +
             '}';
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 231bdda..3f27991 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -38,6 +38,15 @@ public class ClusterManagerConfig extends MapConfig {
   private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = "cluster-manager.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnFaultDomainManagerFactory";
+
+  /**
+   * Determines whether standby allocation is fault domain aware or not.
+   */
+  public static final String FAULT_DOMAIN_AWARE_STANDBY_ENABLED = "cluster-manager.fault-domain-aware.standby.enabled";
+  public static final boolean FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFAULT = false;
+
   /**
    * Sleep interval for the allocator thread in milliseconds
    */
@@ -250,6 +259,14 @@ public class ClusterManagerConfig extends MapConfig {
     return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
   }
 
+  public String getFaultDomainManagerClass() {
+    return get(FAULT_DOMAIN_MANAGER_FACTORY, FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT);
+  }
+
+  public boolean getFaultDomainAwareStandbyEnabled() {
+    return getBoolean(FAULT_DOMAIN_AWARE_STANDBY_ENABLED, FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFAULT);
+  }
+
   public boolean getJmxEnabledOnJobCoordinator() {
     if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) {
       log.warn("Configuration {} is deprecated. Please use {}", CLUSTER_MANAGER_JMX_ENABLED, JOB_JMX_ENABLED);
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 91fec28..96588c9 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -64,4 +64,10 @@ class ContainerProcessManagerMetrics(val config: Config,
 
   val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
   val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
+
+  val mFaultDomainAwareContainerRequests = newGauge("fault-domain-aware-container-requests", () => state.faultDomainAwareContainerRequests.get())
+  val mFaultDomainAwareContainersStarted = newGauge("fault-domain-aware-containers-started", () => state.faultDomainAwareContainersStarted.get())
+  val mExpiredFaultDomainAwareContainerRequests = newGauge("expired-fault-domain-aware-container-requests", () => state.expiredFaultDomainAwareContainerRequests.get())
+  val mFailedFaultDomainAwareContainerRequests = newGauge("failed-fault-domain-aware-container-requests", () => state.failedFaultDomainAwareContainerAllocations.get())
+
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java
new file mode 100644
index 0000000..0c3b455
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class MockFaultDomainManager implements FaultDomainManager {
+
+  private final Multimap<String, FaultDomain> hostToFaultDomainMap;
+
+  public MockFaultDomainManager(MetricsRegistry metricsRegistry) {
+    FaultDomain faultDomain1 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain2 = new FaultDomain(FaultDomainType.RACK, "rack-2");
+    FaultDomain faultDomain3 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain4 = new FaultDomain(FaultDomainType.RACK, "rack-2");
+    FaultDomain faultDomain5 = new FaultDomain(FaultDomainType.RACK, "rack-3");
+    hostToFaultDomainMap = HashMultimap.create();
+    hostToFaultDomainMap.put("host-1", faultDomain1);
+    hostToFaultDomainMap.put("host-2", faultDomain2);
+    hostToFaultDomainMap.put("host-3", faultDomain3);
+    hostToFaultDomainMap.put("host-4", faultDomain4);
+    hostToFaultDomainMap.put("host-5", faultDomain5);
+  }
+
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToFaultDomainMap.values());
+  }
+
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    return new HashSet<>(hostToFaultDomainMap.get(host));
+  }
+
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    return hostToFaultDomainMap.get(host1).equals(hostToFaultDomainMap.get(host2));
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java
new file mode 100644
index 0000000..4dc4dce
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class MockFaultDomainManagerFactory implements FaultDomainManagerFactory {
+
+  @Override
+  public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+    return new MockFaultDomainManager(metricsRegistry);
+  }
+
+}
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
new file mode 100644
index 0000000..2bafa78
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private static final Logger log = LoggerFactory.getLogger(FaultDomainManager.class);
+  private static final String FAULT_DOMAIN_MANAGER_GROUP = "yarn-fault-domain-manager";
+  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = "host-to-fault-domain-cache-updates";
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final YarnClientImpl yarnClient;
+  private Counter hostToFaultDomainCacheUpdates;
+
+  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.yarnClient = yarnClient;
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = hostToRackMap;
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the internal cache.
+   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
+  }
+
+  /**
+   * This method computes the host to rack map from Yarn.
+   * Only the hosts that are running in the cluster will be a part of this map.
+   * @return map of the host and the rack it resides on
+   */
+  @VisibleForTesting
+  Multimap<String, FaultDomain> computeHostToFaultDomainMap() {
+    Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
+    try {
+      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
+      nodeReport.forEach(report -> {
+        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
+        hostToRackMap.put(report.getNodeId().getHost(), rack);
+      });
+      log.info("Computed the host to rack map successfully from Yarn.");
+    } catch (YarnException | IOException e) {
+      throw new SamzaException("Yarn threw an exception while getting NodeReports.", e);
+    }
+    return hostToRackMap;
+  }
+}
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java
new file mode 100644
index 0000000..e4e547c
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainManagerFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * A factory to build a {@link YarnFaultDomainManager}.
+ */
+public class YarnFaultDomainManagerFactory implements FaultDomainManagerFactory {
+  @Override
+  public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+    return new YarnFaultDomainManager(metricsRegistry);
+  }
+}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java
new file mode 100644
index 0000000..9216088
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestYarnFaultDomainManager {
+  private final Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
+  private final String hostName1 = "host1";
+  private final String hostName2 = "host2";
+  private final String hostName3 = "host3";
+  private final String hostName4 = "host4";
+  private final String hostName5 = "host5";
+  private final String hostName6 = "host6";
+  private final String rackName1 = "rack1";
+  private final String rackName2 = "rack2";
+  private final String rackName3 = "rack3";
+
+  private final NodeReport nodeReport1 = createNodeReport(hostName1, 1, NodeState.RUNNING, "httpAddress1",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport2 = createNodeReport(hostName2, 1, NodeState.RUNNING, "httpAddress2",
+          rackName2, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport3 = createNodeReport(hostName3, 1, NodeState.RUNNING, "httpAddress3",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport4 = createNodeReport(hostName4, 1, NodeState.RUNNING, "httpAddress4",
+          rackName2, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport5 = createNodeReport(hostName5, 1, NodeState.RUNNING, "httpAddress5",
+          rackName3, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport6 = createNodeReport(hostName6, 1, NodeState.RUNNING, "httpAddress6",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+
+  @Mock
+  YarnClientImpl yarnClient;
+  @Mock
+  ReadableMetricsRegistry mockMetricsRegistry;
+  @Mock
+  Counter mockCounter;
+
+  @Before
+  public void setup() {
+    FaultDomain rack1 = new FaultDomain(FaultDomainType.RACK, rackName1);
+    FaultDomain rack2 = new FaultDomain(FaultDomainType.RACK, rackName2);
+    FaultDomain rack3 = new FaultDomain(FaultDomainType.RACK, rackName3);
+    hostToRackMap.put(hostName1, rack1);
+    hostToRackMap.put(hostName2, rack2);
+    hostToRackMap.put(hostName3, rack1);
+    hostToRackMap.put(hostName4, rack2);
+    hostToRackMap.put(hostName5, rack3);
+
+    when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
+  }
+
+  @Test
+  public void testGetFaultDomainOfHostWhichExistsInCache() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    Set<FaultDomain> expectedFaultDomainSet = new HashSet<>();
+    expectedFaultDomainSet.add(new FaultDomain(FaultDomainType.RACK, rackName1));
+
+    Set<FaultDomain> actualFaultDomainSet = yarnFaultDomainManager.getFaultDomainsForHost(hostName3);
+
+    assertNotNull(actualFaultDomainSet);
+    assertEquals(expectedFaultDomainSet.iterator().next(), actualFaultDomainSet.iterator().next());
+    verify(mockCounter, times(0)).inc();
+  }
+
+  @Test
+  public void testGetFaultDomainOfHostWhichDoesNotExistInCache() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    Set<FaultDomain> expectedFaultDomainSet = new HashSet<>();
+    expectedFaultDomainSet.add(new FaultDomain(FaultDomainType.RACK, rackName1));
+
+    List<NodeReport> updatedNodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5, nodeReport6);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(updatedNodeReport);
+
+    Set<FaultDomain> actualFaultDomainSet = yarnFaultDomainManager.getFaultDomainsForHost(hostName6);
+
+    assertNotNull(actualFaultDomainSet);
+    assertEquals(expectedFaultDomainSet.iterator().next(), actualFaultDomainSet.iterator().next());
+    verify(mockCounter, times(1)).inc();
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenTrue() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName3);
+
+    assertTrue(result);
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenFalse() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName2);
+
+    assertFalse(result);
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenHostDoesNotExistInCache() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    List<NodeReport> updatedNodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5, nodeReport6);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(updatedNodeReport);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName6);
+
+    assertTrue(result);
+  }
+
+  @Test
+  public void testComputeHostToFaultDomainMap() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, null);
+
+    List<NodeReport> nodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(nodeReport);
+
+    Multimap<String, FaultDomain> hostToRackMap = yarnFaultDomainManager.computeHostToFaultDomainMap();
+
+    assertEquals(this.hostToRackMap.size(), hostToRackMap.size());
+    assertEquals(this.hostToRackMap.keySet(), hostToRackMap.keySet());
+    Iterator<FaultDomain> expectedValues = this.hostToRackMap.values().iterator();
+    Iterator<FaultDomain> computedValues = hostToRackMap.values().iterator();
+    expectedValues.forEachRemaining(expectedRack -> assertFaultDomainEquals(expectedRack, computedValues.next()));
+  }
+
+  private void assertFaultDomainEquals(FaultDomain faultDomain1, FaultDomain faultDomain2) {
+    assertEquals(faultDomain1.getType(), faultDomain2.getType());
+    assertEquals(faultDomain1.getId(), faultDomain2.getId());
+  }
+
+  private NodeReport createNodeReport(String host, int port, NodeState nodeState, String httpAddress, String rackName,
+                                      int memoryUsed, int vcoresUsed, int totalMemory, int totalVcores, int numContainers,
+                                      String healthReport, long lastHealthReportTime, Set<String> nodeLabels) {
+    return NodeReport.newInstance(NodeId.newInstance(host, port), nodeState, httpAddress, rackName,
+            Resource.newInstance(memoryUsed, vcoresUsed), Resource.newInstance(totalMemory, totalVcores), numContainers,
+            healthReport, lastHealthReportTime, nodeLabels);
+  }
+}