You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/26 00:17:39 UTC
samza git commit: SAMZA-1687: Prioritize preferred host requests over
ANY-HOST requests
Repository: samza
Updated Branches:
refs/heads/master 66525b51e -> 48b17be33
SAMZA-1687: Prioritize preferred host requests over ANY-HOST requests
Working on a documentation that describes this better, but a TL;DR summary is that we should prioritize preferred-host requests over ANY_HOST requests.
Yarn enforces these two checks:
1. ANY_HOST requests should always be made with relax-locality = true
2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than any-host requests since data-locality is critical.
Author: Jagadish <jv...@linkedin.com>
Closes #488 from vjagadish/priority-host-affinity
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/48b17be3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/48b17be3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/48b17be3
Branch: refs/heads/master
Commit: 48b17be33d19ca9346cbbade0cca14ba334319ba
Parents: 66525b5
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Apr 25 17:17:34 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Apr 25 17:17:34 2018 -0700
----------------------------------------------------------------------
.../clustermanager/ResourceRequestState.java | 10 ++++--
.../job/yarn/YarnClusterResourceManager.java | 34 +++++++++++---------
2 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/48b17be3/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 51caa39..2dc31bf 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* this state is shared across both the Allocator Thread, and the Callback handler thread.
*
*/
-public class ResourceRequestState {
+public class ResourceRequestState {
private static final Logger log = LoggerFactory.getLogger(ResourceRequestState.class);
public static final String ANY_HOST = "ANY_HOST";
@@ -190,8 +190,12 @@ public class ResourceRequestState {
synchronized (lock) {
requestsQueue.remove(request);
// A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
- allocatedResources.get(assignedHost).remove(samzaResource);
- allocatedResources.get(ANY_HOST).remove(samzaResource);
+ if (allocatedResources.get(assignedHost) != null) {
+ allocatedResources.get(assignedHost).remove(samzaResource);
+ }
+ if (allocatedResources.get(ANY_HOST) != null) {
+ allocatedResources.get(ANY_HOST).remove(samzaResource);
+ }
if (hostAffinityEnabled) {
// assignedHost may not always be the preferred host.
// Hence, we should safely decrement the counter for the preferredHost
http://git-wip-us.apache.org/repos/asf/samza/blob/48b17be3/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index f8c7d9b..407768c 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -79,6 +79,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
+ private static final int PREFERRED_HOST_PRIORITY = 0;
+ private static final int ANY_HOST_PRIORITY = 1;
+
private final String INVALID_YARN_CONTAINER_ID = "-1";
/**
@@ -213,7 +216,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
*/
@Override
public void requestResources(SamzaResourceRequest resourceRequest) {
- final int DEFAULT_PRIORITY = 0;
log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
int memoryMb = resourceRequest.getMemoryMB();
@@ -221,25 +223,27 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
String containerLabel = yarnConfig.getContainerLabel();
String preferredHost = resourceRequest.getPreferredHost();
Resource capability = Resource.newInstance(memoryMb, cpuCores);
- Priority priority = Priority.newInstance(DEFAULT_PRIORITY);
AMRMClient.ContainerRequest issuedRequest;
- if (preferredHost.equals("ANY_HOST"))
- {
- log.info("Making a request for ANY_HOST " + preferredHost );
- issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, true, containerLabel);
+ /*
+ * Yarn enforces these two checks:
+ * 1. ANY_HOST requests should always be made with relax-locality = true
+ * 2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true
+ *
+ * Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests
+ * should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than
+ * any-host requests since data-locality is critical.
+ */
+ if (preferredHost.equals("ANY_HOST")) {
+ log.info("Making a request for ANY_HOST ");
+ issuedRequest = new AMRMClient.ContainerRequest(capability, null, null,
+ Priority.newInstance(ANY_HOST_PRIORITY), true, containerLabel);
}
- else
- {
+ else {
log.info("Making a preferred host request on " + preferredHost);
- issuedRequest = new AMRMClient.ContainerRequest(
- capability,
- new String[]{preferredHost},
- null,
- priority,
- false,
- containerLabel);
+ issuedRequest = new AMRMClient.ContainerRequest(capability, new String[]{preferredHost}, null,
+ Priority.newInstance(PREFERRED_HOST_PRIORITY), false, containerLabel);
}
//ensure that updating the state and making the request are done atomically.
synchronized (lock) {