You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/26 00:17:39 UTC

samza git commit: SAMZA-1687: Prioritize preferred host requests over ANY-HOST requests

Repository: samza
Updated Branches:
  refs/heads/master 66525b51e -> 48b17be33


SAMZA-1687: Prioritize preferred host requests over ANY-HOST requests

Working on a documentation that describes this better, but a TL;DR summary is that we should prioritize preferred-host requests over ANY_HOST requests.

Yarn enforces these two checks:
1. ANY_HOST requests should always be made with relax-locality = true
2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true

Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than any-host requests since data-locality is critical.

Author: Jagadish <jv...@linkedin.com>

Closes #488 from vjagadish/priority-host-affinity


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/48b17be3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/48b17be3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/48b17be3

Branch: refs/heads/master
Commit: 48b17be33d19ca9346cbbade0cca14ba334319ba
Parents: 66525b5
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Apr 25 17:17:34 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Apr 25 17:17:34 2018 -0700

----------------------------------------------------------------------
 .../clustermanager/ResourceRequestState.java    | 10 ++++--
 .../job/yarn/YarnClusterResourceManager.java    | 34 +++++++++++---------
 2 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/48b17be3/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 51caa39..2dc31bf 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * this state is shared across both the Allocator Thread, and the Callback handler thread.
  *
  */
-public class ResourceRequestState {
+public class  ResourceRequestState {
   private static final Logger log = LoggerFactory.getLogger(ResourceRequestState.class);
   public static final String ANY_HOST = "ANY_HOST";
 
@@ -190,8 +190,12 @@ public class ResourceRequestState {
     synchronized (lock) {
       requestsQueue.remove(request);
       // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
-      allocatedResources.get(assignedHost).remove(samzaResource);
-      allocatedResources.get(ANY_HOST).remove(samzaResource);
+      if (allocatedResources.get(assignedHost) != null) {
+        allocatedResources.get(assignedHost).remove(samzaResource);
+      }
+      if (allocatedResources.get(ANY_HOST) != null) {
+        allocatedResources.get(ANY_HOST).remove(samzaResource);
+      }
       if (hostAffinityEnabled) {
         // assignedHost may not always be the preferred host.
         // Hence, we should safely decrement the counter for the preferredHost

http://git-wip-us.apache.org/repos/asf/samza/blob/48b17be3/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index f8c7d9b..407768c 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -79,6 +79,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
 
+  private static final int PREFERRED_HOST_PRIORITY = 0;
+  private static final int ANY_HOST_PRIORITY = 1;
+
   private final String INVALID_YARN_CONTAINER_ID = "-1";
 
   /**
@@ -213,7 +216,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   @Override
   public void requestResources(SamzaResourceRequest resourceRequest) {
-    final int DEFAULT_PRIORITY = 0;
     log.info("Requesting resources on  " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
 
     int memoryMb = resourceRequest.getMemoryMB();
@@ -221,25 +223,27 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     String containerLabel = yarnConfig.getContainerLabel();
     String preferredHost = resourceRequest.getPreferredHost();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
-    Priority priority =  Priority.newInstance(DEFAULT_PRIORITY);
 
     AMRMClient.ContainerRequest issuedRequest;
 
-    if (preferredHost.equals("ANY_HOST"))
-    {
-      log.info("Making a request for ANY_HOST " + preferredHost );
-      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, true, containerLabel);
+    /*
+     * Yarn enforces these two checks:
+     *   1. ANY_HOST requests should always be made with relax-locality = true
+     *   2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
+     *
+     * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
+     * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
+     * any-host requests since data-locality is critical.
+     */
+    if (preferredHost.equals("ANY_HOST")) {
+      log.info("Making a request for ANY_HOST ");
+      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null,
+          Priority.newInstance(ANY_HOST_PRIORITY), true, containerLabel);
     }
-    else
-    {
+    else {
       log.info("Making a preferred host request on " + preferredHost);
-      issuedRequest = new AMRMClient.ContainerRequest(
-              capability,
-              new String[]{preferredHost},
-              null,
-              priority,
-              false,
-              containerLabel);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, new String[]{preferredHost}, null,
+          Priority.newInstance(PREFERRED_HOST_PRIORITY), false, containerLabel);
     }
     //ensure that updating the state and making the request are done atomically.
     synchronized (lock) {