You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/02/26 02:17:13 UTC
[samza] branch master updated: SAMZA-2108: Check for host affinity
config before resolving preferred host matching
This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8052463 SAMZA-2108: Check for host affinity config before resolving preferred host matching
8052463 is described below
commit 80524637ae3d9aa8be0e71b42c51d8b25d8e8efa
Author: Daniel Chen <dc...@linkedin.com>
AuthorDate: Mon Feb 25 18:17:07 2019 -0800
SAMZA-2108: Check for host affinity config before resolving preferred host matching
vjagadish rmatharu Added check in the AbstractContainerAllocator to check the host-affinity config before using the preferred host mapping
Author: Daniel Chen <dc...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #924 from dxichen/master
---
.../clustermanager/AbstractContainerAllocator.java | 15 ++--------
.../samza/clustermanager/ContainerAllocator.java | 18 ++++++++++++
.../HostAwareContainerAllocator.java | 23 +++++++++++++++
.../clustermanager/TestContainerAllocator.java | 33 ++++++++++++++++++++++
4 files changed, 77 insertions(+), 12 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 9f1afed..5547a32 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -161,22 +161,13 @@ public abstract class AbstractContainerAllocator implements Runnable {
/**
* Called during initial request for resources
*
- * @param resourceToHostMappings A Map of [containerId, hostName] containerId is the ID of the container process
+ * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
* to run on the resource. hostName is the host on which the resource must be allocated.
* The hostName value is null, either
- * - when host-affinity is not enabled, or
+ * - when host-affinity has never been enabled, or
* - when host-affinity is enabled and job is run for the first time
*/
- public void requestResources(Map<String, String> resourceToHostMappings) {
- for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
- String containerId = entry.getKey();
- String preferredHost = entry.getValue();
- if (preferredHost == null)
- preferredHost = ResourceRequestState.ANY_HOST;
-
- requestResource(containerId, preferredHost);
- }
- }
+ public abstract void requestResources(Map<String, String> resourceToHostMapping);
/**
* Checks if this allocator has a pending resource request.
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5997a7a..30dcd29 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import java.util.Map;
import org.apache.samza.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,4 +53,21 @@ public class ContainerAllocator extends AbstractContainerAllocator {
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
}
}
+
+ /**
+ * Since host-affinity is not enabled, the container id to host mappings will be ignored and all resources will be
+ * matched to any available host.
+ *
+ * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
+ * to run on the resource. The hostName will be ignored and each container process will
+ * be matched to any available host.
+ */
+ @Override
+ public void requestResources(Map<String, String> resourceToHostMapping) {
+ for (Map.Entry<String, String> entry : resourceToHostMapping.entrySet()) {
+ String containerId = entry.getKey();
+ requestResource(containerId, ResourceRequestState.ANY_HOST);
+ }
+ }
+
}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index d59a893..50a19fa 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import java.util.Map;
import org.apache.samza.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +99,28 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
}
/**
+ * Since host-affinity is enabled, all container processes will be requested on their preferred host. If the job is
+ * run for the first time, it will get matched to any available host.
+ *
+ * @param resourceToHostMapping A Map of [containerId, hostName] containerId is the ID of the container process
+ * to run on the resource. hostName is the host on which the resource must be allocated.
+ * The hostName value is null when host-affinity is enabled and job is run for the
+ * first time
+ */
+ @Override
+ public void requestResources(Map<String, String> resourceToHostMapping) {
+ for (Map.Entry<String, String> entry : resourceToHostMapping.entrySet()) {
+ String containerId = entry.getKey();
+ String preferredHost = entry.getValue();
+ if (preferredHost == null) {
+ log.info("Preferred host not found for container id: {}", containerId);
+ preferredHost = ResourceRequestState.ANY_HOST;
+ }
+ requestResource(containerId, preferredHost);
+ }
+ }
+
+ /**
* Checks if a request has expired.
* @param request
* @return
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 4596673..dd40e7c 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -139,6 +139,39 @@ public class TestContainerAllocator {
}
/**
+ * Test requestContainers with containerToHostMapping with host.affinity disabled
+ */
+ @Test
+ public void testRequestContainersWithExistingHosts() throws Exception {
+ Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+ {
+ put("0", "prev_host");
+ put("1", "prev_host");
+ put("2", "prev_host");
+ put("3", "prev_host");
+ }
+ };
+
+ allocatorThread.start();
+
+ containerAllocator.requestResources(containersToHostMapping);
+
+ assertEquals(4, manager.resourceRequests.size());
+
+ assertNotNull(requestState);
+
+ assertEquals(requestState.numPendingRequests(), 4);
+
+ // If host-affinty is not enabled, it doesn't update the requestMap
+ assertNotNull(requestState.getRequestsToCountMap());
+ assertEquals(requestState.getRequestsToCountMap().keySet().size(), 0);
+
+ assertNotNull(state);
+ assertEquals(state.anyHostRequests.get(), 4);
+ assertEquals(state.preferredHostRequests.get(), 0);
+ }
+
+ /**
* Test request containers with no containerToHostMapping makes the right number of requests
*/
@Test