You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/02/26 02:17:13 UTC

[samza] branch master updated: SAMZA-2108: Check for host affinity config before resolving preferred host matching

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8052463  SAMZA-2108: Check for host affinity config before resolving preferred host matching
8052463 is described below

commit 80524637ae3d9aa8be0e71b42c51d8b25d8e8efa
Author: Daniel Chen <dc...@linkedin.com>
AuthorDate: Mon Feb 25 18:17:07 2019 -0800

    SAMZA-2108: Check for host affinity config before resolving preferred host matching
    
    vjagadish rmatharu  Added check in the AbstractContainerAllocator to check the host-affinity config before using the preferred host mapping
    
    Author: Daniel Chen <dc...@linkedin.com>
    
    Reviewers: Jagadish <ja...@apache.org>
    
    Closes #924 from dxichen/master
---
 .../clustermanager/AbstractContainerAllocator.java | 15 ++--------
 .../samza/clustermanager/ContainerAllocator.java   | 18 ++++++++++++
 .../HostAwareContainerAllocator.java               | 23 +++++++++++++++
 .../clustermanager/TestContainerAllocator.java     | 33 ++++++++++++++++++++++
 4 files changed, 77 insertions(+), 12 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 9f1afed..5547a32 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -161,22 +161,13 @@ public abstract class AbstractContainerAllocator implements Runnable {
   /**
    * Called during initial request for resources
    *
-   * @param resourceToHostMappings A Map of [containerId, hostName] containerId is the ID of the container process
+   * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
    *                               to run on the resource. hostName is the host on which the resource must be allocated.
    *                                The hostName value is null, either
-   *                                - when host-affinity is not enabled, or
+   *                                - when host-affinity has never been enabled, or
    *                                - when host-affinity is enabled and job is run for the first time
    */
-  public void requestResources(Map<String, String> resourceToHostMappings) {
-    for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
-      String containerId = entry.getKey();
-      String preferredHost = entry.getValue();
-      if (preferredHost == null)
-        preferredHost = ResourceRequestState.ANY_HOST;
-
-      requestResource(containerId, preferredHost);
-    }
-  }
+  public abstract void requestResources(Map<String, String> resourceToHostMapping);
 
   /**
    * Checks if this allocator has a pending resource request.
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5997a7a..30dcd29 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,4 +53,21 @@ public class ContainerAllocator extends AbstractContainerAllocator {
       runStreamProcessor(request, ResourceRequestState.ANY_HOST);
     }
   }
+
+  /**
+   * Since host-affinity is not enabled, the container id to host mappings will be ignored and all resources will be
+   * matched to any available host.
+   *
+   * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
+   *                               to run on the resource. The hostName will be ignored and each container process will
+   *                               be matched to any available host.
+   */
+  @Override
+  public void requestResources(Map<String, String> resourceToHostMapping)  {
+    for (Map.Entry<String, String> entry : resourceToHostMapping.entrySet()) {
+      String containerId = entry.getKey();
+      requestResource(containerId, ResourceRequestState.ANY_HOST);
+    }
+  }
+
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index d59a893..50a19fa 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,6 +99,28 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
   }
 
   /**
+   * Since host-affinity is enabled, all container processes will be requested on their preferred host. If the job is
+   * run for the first time, it will get matched to any available host.
+   *
+   * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
+   *                               to run on the resource. hostName is the host on which the resource must be allocated.
+   *                                The hostName value is null when host-affinity is enabled and job is run for the
+   *                                first time
+   */
+  @Override
+  public void requestResources(Map<String, String> resourceToHostMapping) {
+    for (Map.Entry<String, String> entry : resourceToHostMapping.entrySet()) {
+      String containerId = entry.getKey();
+      String preferredHost = entry.getValue();
+      if (preferredHost == null) {
+        log.info("Preferred host not found for container id: {}", containerId);
+        preferredHost = ResourceRequestState.ANY_HOST;
+      }
+      requestResource(containerId, preferredHost);
+    }
+  }
+
+  /**
    * Checks if a request has expired.
    * @param request
    * @return
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 4596673..dd40e7c 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -139,6 +139,39 @@ public class TestContainerAllocator {
   }
 
   /**
+   * Test requestContainers with containerToHostMapping with host.affinity disabled
+   */
+  @Test
+  public void testRequestContainersWithExistingHosts() throws Exception {
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", "prev_host");
+        put("1", "prev_host");
+        put("2", "prev_host");
+        put("3", "prev_host");
+      }
+    };
+
+    allocatorThread.start();
+
+    containerAllocator.requestResources(containersToHostMapping);
+
+    assertEquals(4, manager.resourceRequests.size());
+
+    assertNotNull(requestState);
+
+    assertEquals(requestState.numPendingRequests(), 4);
+
+    // If host-affinty is not enabled, it doesn't update the requestMap
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertEquals(requestState.getRequestsToCountMap().keySet().size(), 0);
+
+    assertNotNull(state);
+    assertEquals(state.anyHostRequests.get(), 4);
+    assertEquals(state.preferredHostRequests.get(), 0);
+  }
+
+  /**
    * Test request containers with no containerToHostMapping makes the right number of requests
    */
   @Test