You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/10 08:30:53 UTC
samza git commit: SAMZA-866: Refactor container allocator classes
Repository: samza
Updated Branches:
refs/heads/master 7c23e24f2 -> 495f2eb8e
SAMZA-866: Refactor container allocator classes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/495f2eb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/495f2eb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/495f2eb8
Branch: refs/heads/master
Commit: 495f2eb8e66d0729f7d5b66cdcff47eda32ef8a6
Parents: 7c23e24
Author: Jacob Maes <ja...@gmail.com>
Authored: Tue Feb 9 23:30:21 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Feb 9 23:30:21 2016 -0800
----------------------------------------------------------------------
.../job/yarn/AbstractContainerAllocator.java | 33 +++++++-
.../samza/job/yarn/ContainerAllocator.java | 40 ++++-----
.../samza/job/yarn/ContainerRequestState.java | 58 ++++++-------
.../job/yarn/HostAwareContainerAllocator.java | 87 ++++++++------------
4 files changed, 110 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index 9ee2dac..2e192ee 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
@@ -34,6 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
* See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
*/
public abstract class AbstractContainerAllocator implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
+
public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_CONTAINER_MEM = 1024;
@@ -45,9 +50,6 @@ public abstract class AbstractContainerAllocator implements Runnable {
protected final int containerMaxMemoryMb;
protected final int containerMaxCpuCore;
- @Override
- public abstract void run();
-
// containerRequestState indicate the state of all unfulfilled container requests and allocated containers
protected final ContainerRequestState containerRequestState;
@@ -66,6 +68,31 @@ public abstract class AbstractContainerAllocator implements Runnable {
this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
}
+ /**
+ * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
+ * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
+ *
+ * Terminates when the isRunning flag is cleared.
+ */
+ @Override
+ public void run() {
+ while(isRunning.get()) {
+ try {
+ assignContainerRequests();
+ Thread.sleep(ALLOCATOR_SLEEP_TIME);
+ } catch (InterruptedException e) {
+ log.info("Got InterruptedException in AllocatorThread.", e);
+ } catch (Exception e) {
+ log.error("Got unknown Exception in AllocatorThread.", e);
+ }
+ }
+ }
+
+ /**
+ * Assigns the container requests from the queue to the allocated containers from the cluster manager and
+ * runs them.
+ */
+ protected abstract void assignContainerRequests();
/**
* Called during initial request for containers
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
index 7c57a86..31fcc57 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -18,13 +18,13 @@
*/
package org.apache.samza.job.yarn;
+import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
/**
* This is the default allocator thread that will be used by SamzaTaskManager.
@@ -42,36 +42,28 @@ public class ContainerAllocator extends AbstractContainerAllocator {
}
/**
- * During the run() method, the thread sleeps for ALLOCATOR_SLEEP_TIME ms. It tries to allocate any unsatisfied
- * request that is still in the request queue (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
+ * This method tries to allocate any unsatisfied request that is still in the request queue
+ * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
* with allocated containers, if any.
*
* Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
* */
@Override
- public void run() {
- while(isRunning.get()) {
- try {
- List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
- while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
- SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
- Container container = allocatedContainers.get(0);
-
- // Update state
- containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+ public void assignContainerRequests() {
+ List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+ while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
+ SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+ Container container = allocatedContainers.get(0);
- // Cancel request and run container
- log.info("Running {} on {}", request.expectedContainerId, container.getId());
- containerUtil.runContainer(request.expectedContainerId, container);
- }
+ // Update state
+ containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
- // If requestQueue is empty, all extra containers in the buffer should be released.
- containerRequestState.releaseExtraContainers();
-
- Thread.sleep(ALLOCATOR_SLEEP_TIME);
- } catch (InterruptedException e) {
- log.info("Got InterruptedException in AllocatorThread. Pending Container request(s) cannot be fulfilled!!", e);
- }
+ // Cancel request and run container
+ log.info("Running {} on {}", request.expectedContainerId, container.getId());
+ containerUtil.runContainer(request.expectedContainerId, container);
}
+
+ // If requestQueue is empty, all extra containers in the buffer should be released.
+ containerRequestState.releaseExtraContainers();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
index ab3061e..54db5e5 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -18,18 +18,17 @@
*/
package org.apache.samza.job.yarn;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class maintains the state variables for all the container requests and the allocated containers returned
@@ -205,35 +204,35 @@ public class ContainerRequestState {
public synchronized int releaseExtraContainers() {
int numReleasedContainers = 0;
- if (hostAffinityEnabled) {
- if (requestsQueue.isEmpty()) {
- log.debug("Container Requests Queue is empty.");
+ if (requestsQueue.isEmpty()) {
+ log.debug("Container Requests Queue is empty.");
+ if (hostAffinityEnabled) {
List<String> allocatedHosts = getAllocatedHosts();
for (String host : allocatedHosts) {
- List<Container> containers = getContainersOnAHost(host);
- if (containers != null) {
- for (Container c : containers) {
- log.info("Releasing extra container {} allocated on {}", c.getId(), host);
- amClient.releaseAssignedContainer(c.getId());
- numReleasedContainers++;
- }
- }
+ numReleasedContainers += releaseContainersForHost(host);
}
- clearState();
+ } else {
+ numReleasedContainers += releaseContainersForHost(ANY_HOST);
}
- } else {
- if (requestsQueue.isEmpty()) {
- log.debug("No more pending requests in Container Requests Queue.");
+ clearState();
+ }
+ return numReleasedContainers;
+ }
- List<Container> availableContainers = getContainersOnAHost(ANY_HOST);
- while(availableContainers != null && !availableContainers.isEmpty()) {
- Container c = availableContainers.remove(0);
- log.info("Releasing extra allocated container - {}", c.getId());
- amClient.releaseAssignedContainer(c.getId());
- numReleasedContainers++;
- }
- clearState();
+ /**
+ * Releases all allocated containers for the specified host.
+ * @param host the host for which the containers should be released.
+ * @return the number of containers released.
+ */
+ private int releaseContainersForHost(String host) {
+ int numReleasedContainers = 0;
+ List<Container> containers = getContainersOnAHost(host);
+ if (containers != null) {
+ for (Container c : containers) {
+ log.info("Releasing extra container {} allocated on {}", c.getId(), host);
+ amClient.releaseAssignedContainer(c.getId());
+ numReleasedContainers++;
}
}
return numReleasedContainers;
@@ -242,6 +241,7 @@ public class ContainerRequestState {
/**
* Clears all the state variables
* Performed when there are no more unfulfilled requests
+ * This is not synchronized because it is private.
*/
private void clearState() {
allocatedContainers.clear();
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index ff22dbf..8e1db77 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -18,13 +18,13 @@
*/
package org.apache.samza.job.yarn;
+import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
/**
* This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -55,66 +55,49 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
* allocatedContainers buffer keyed by "ANY_HOST".
*/
@Override
- public void run() {
- try {
- while (isRunning.get()) {
- while (!containerRequestState.getRequestsQueue().isEmpty()) {
- SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
- String preferredHost = request.getPreferredHost();
- int expectedContainerId = request.getExpectedContainerId();
+ public void assignContainerRequests() {
+ while (!containerRequestState.getRequestsQueue().isEmpty()) {
+ SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+ String preferredHost = request.getPreferredHost();
+ int expectedContainerId = request.getExpectedContainerId();
- log.info(
- "Handling request for container id {} on preferred host {}",
- expectedContainerId,
- preferredHost);
+ log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
- List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
- if (allocatedContainers != null && allocatedContainers.size() > 0) {
- // Found allocated container at preferredHost
- Container container = allocatedContainers.get(0);
+ List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
+ if (allocatedContainers != null && allocatedContainers.size() > 0) {
+ // Found allocated container at preferredHost
+ Container container = allocatedContainers.get(0);
- containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+ containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+ log.info("Running {} on {}", expectedContainerId, container.getId());
+ containerUtil.runMatchedContainer(expectedContainerId, container);
+ } else {
+ // No allocated container on preferredHost
+ log.info("Did not find any allocated containers on preferred host {} for running container id {}",
+ preferredHost, expectedContainerId);
+ boolean expired = requestExpired(request);
+ allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+ if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
+ log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
+ + "find any free allocated containers in the buffer. Breaking out of loop.",
+ request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
+ break;
+ } else {
+ if (allocatedContainers.size() > 0) {
+ Container container = allocatedContainers.get(0);
+ log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with "
+ + "timestamp {} to container {}",
+ new Object[]{String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
+ containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
log.info("Running {} on {}", expectedContainerId, container.getId());
- containerUtil.runMatchedContainer(expectedContainerId, container);
- } else {
- // No allocated container on preferredHost
- log.info(
- "Did not find any allocated containers on preferred host {} for running container id {}",
- preferredHost,
- expectedContainerId);
- boolean expired = requestExpired(request);
- allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
- if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
- log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't " +
- "find any free allocated containers in the buffer. Breaking out of loop.",
- request.getRequestTimestamp(),
- CONTAINER_REQUEST_TIMEOUT);
- break;
- } else {
- if (allocatedContainers.size() > 0) {
- Container container = allocatedContainers.get(0);
- log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with " +
- "timestamp {} to container {}",
- new Object[] { String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()
- });
- containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
- log.info("Running {} on {}", expectedContainerId, container.getId());
- containerUtil.runContainer(expectedContainerId, container);
- }
- }
+ containerUtil.runContainer(expectedContainerId, container);
}
}
- // Release extra containers and update the entire system's state
- containerRequestState.releaseExtraContainers();
-
- Thread.sleep(ALLOCATOR_SLEEP_TIME);
}
- } catch (InterruptedException ie) {
- log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", ie);
- } catch (Exception e) {
- log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e);
}
+ // Release extra containers and update the entire system's state
+ containerRequestState.releaseExtraContainers();
}
private boolean requestExpired(SamzaContainerRequest request) {