You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:21 UTC

[05/50] [abbrv] samza git commit: SAMZA-792: SamzaAppMaster Java code needs to pass the requested container memory size to RM

SAMZA-792: SamzaAppMaster Java code needs to pass the requested container memory size to RM


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b14da282
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b14da282
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b14da282

Branch: refs/heads/samza-sql
Commit: b14da282cd92d28252a77adb80aa038ba0a66bc9
Parents: dfdc35e
Author: Navina <na...@gmail.com>
Authored: Thu Oct 8 16:16:26 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Oct 8 16:16:26 2015 -0700

----------------------------------------------------------------------
 .../job/yarn/AbstractContainerAllocator.java    | 23 +++++++----
 .../samza/job/yarn/ContainerAllocator.java      |  7 ++--
 .../job/yarn/HostAwareContainerAllocator.java   | 10 ++---
 .../samza/job/yarn/SamzaContainerRequest.java   |  1 +
 .../apache/samza/job/yarn/SamzaTaskManager.java |  6 +--
 .../samza/job/yarn/TestContainerAllocator.java  |  5 ++-
 .../yarn/TestHostAwareContainerAllocator.java   |  8 ++--
 .../samza/job/yarn/TestSamzaTaskManager.java    | 43 ++++++++++++++++----
 .../job/yarn/util/MockContainerAllocator.java   |  7 ++--
 9 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index eec1708..6edd477 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -21,11 +21,7 @@ package org.apache.samza.job.yarn;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
+import org.apache.samza.config.YarnConfig;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -46,6 +42,8 @@ public abstract class AbstractContainerAllocator implements Runnable {
   protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
   protected final int ALLOCATOR_SLEEP_TIME;
   protected final ContainerUtil containerUtil;
+  protected final int containerMaxMemoryMb;
+  protected final int containerMaxCpuCore;
 
   @Override
   public abstract void run();
@@ -58,12 +56,14 @@ public abstract class AbstractContainerAllocator implements Runnable {
 
   public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
                             ContainerUtil containerUtil,
-                            int allocatorSleepTime,
-                            ContainerRequestState containerRequestState) {
+                            ContainerRequestState containerRequestState,
+                            YarnConfig yarnConfig) {
     this.amClient = amClient;
     this.containerUtil = containerUtil;
-    this.ALLOCATOR_SLEEP_TIME = allocatorSleepTime;
+    this.ALLOCATOR_SLEEP_TIME = yarnConfig.getAllocatorSleepTime();
     this.containerRequestState = containerRequestState;
+    this.containerMaxMemoryMb = yarnConfig.getContainerMaxMemoryMb();
+    this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
   }
 
 
@@ -93,7 +93,12 @@ public abstract class AbstractContainerAllocator implements Runnable {
    * @param preferredHost Name of the host that you prefer to run the container on
    */
   public final void requestContainer(int expectedContainerId, String preferredHost) {
-    SamzaContainerRequest request = new SamzaContainerRequest(expectedContainerId, preferredHost);
+    SamzaContainerRequest request = new SamzaContainerRequest(
+        containerMaxMemoryMb,
+        containerMaxCpuCore,
+        DEFAULT_PRIORITY,
+        expectedContainerId,
+        preferredHost);
     containerRequestState.updateRequestState(request);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
index 9911540..7c57a86 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -21,11 +21,10 @@ package org.apache.samza.job.yarn;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.List;
-import java.util.Map;
 
 /**
  * This is the default allocator thread that will be used by SamzaTaskManager.
@@ -38,8 +37,8 @@ public class ContainerAllocator extends AbstractContainerAllocator {
 
   public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
                             ContainerUtil containerUtil,
-                            int allocatorSleepTime) {
-    super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, false));
+                            YarnConfig yarnConfig) {
+    super(amClient, containerUtil, new ContainerRequestState(amClient, false), yarnConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index e3b5868..ad1587d 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -21,11 +21,10 @@ package org.apache.samza.job.yarn;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.List;
-import java.util.Map;
 
 /**
  * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -43,10 +42,9 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
 
   public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
                                      ContainerUtil containerUtil,
-                                     int allocatorSleepTime,
-                                     int containerRequestTimeout) {
-    super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, true));
-    this.CONTAINER_REQUEST_TIMEOUT = containerRequestTimeout;
+                                     YarnConfig yarnConfig) {
+    super(amClient, containerUtil, new ContainerRequestState(amClient, true), yarnConfig);
+    this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
index 9441d77..a84e53f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
@@ -63,6 +63,7 @@ public class SamzaContainerRequest implements Comparable<SamzaContainerRequest>
     this.requestTimestamp = System.currentTimeMillis();
   }
 
+  // Convenience class for unit testing
   public SamzaContainerRequest(int expectedContainerId, String preferredHost) {
     this(
         AbstractContainerAllocator.DEFAULT_CONTAINER_MEM,

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
index 12f2f2c..d17ffe0 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
@@ -32,7 +32,6 @@ import org.apache.samza.config.YarnConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -84,14 +83,13 @@ class SamzaTaskManager implements YarnAppMasterListener {
       this.containerAllocator = new HostAwareContainerAllocator(
           amClient,
           new ContainerUtil(config, state, conf),
-          yarnConfig.getAllocatorSleepTime(),
-          yarnConfig.getContainerRequestTimeout()
+          yarnConfig
       );
     } else {
       this.containerAllocator = new ContainerAllocator(
           amClient,
           new ContainerUtil(config, state, conf),
-          yarnConfig.getAllocatorSleepTime());
+          yarnConfig);
     }
 
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 01f32a4..b20e351 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.server.HttpServer;
@@ -53,7 +54,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class TestContainerAllocator {
-  private final int ALLOCATOR_SLEEP_TIME = 10;
   private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
   private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
 
@@ -74,6 +74,7 @@ public class TestContainerAllocator {
       put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
       put("yarn.container.retry.count", "1");
       put("yarn.container.retry.window.ms", "1999999999");
+      put("yarn.allocator.sleep.ms", "10");
     }
   });
 
@@ -107,7 +108,7 @@ public class TestContainerAllocator {
     containerAllocator = new ContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(config, state),
-        ALLOCATOR_SLEEP_TIME
+        new YarnConfig(config)
     );
     Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
     requestStateField.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 663ea25..08e53aa 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.server.HttpServer;
@@ -54,8 +55,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class TestHostAwareContainerAllocator {
-  private static final int ALLOCATOR_SLEEP_TIME = 1;
-  private static final int CONTAINER_REQUEST_TIMEOUT = 3;
   private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
 
   private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
@@ -79,6 +78,8 @@ public class TestHostAwareContainerAllocator {
       put("yarn.container.retry.count", "1");
       put("yarn.container.retry.window.ms", "1999999999");
       put("yarn.samza.host-affinity.enabled", "true");
+      put("yarn.container.request.timeout.ms", "3");
+      put("yarn.allocator.sleep.ms", "1");
     }
   });
 
@@ -121,8 +122,7 @@ public class TestHostAwareContainerAllocator {
     containerAllocator = new HostAwareContainerAllocator(
         amRmClientAsync,
         containerUtil,
-        ALLOCATOR_SLEEP_TIME,
-        CONTAINER_REQUEST_TIMEOUT
+        new YarnConfig(config)
     );
     Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
     requestStateField.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index 4c1eaa9..b12ae5c 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
@@ -134,8 +135,13 @@ public class TestSamzaTaskManager {
 
   @Test
   public void testSamzaTaskManager() throws Exception {
+    Map<String, String> conf = new HashMap<>();
+    conf.putAll(getConfig());
+    conf.put("yarn.container.memory.mb", "500");
+    conf.put("yarn.container.cpu.cores", "5");
+
     SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
+        new MapConfig(conf),
         state,
         amRmClientAsync,
         new YarnConfiguration()
@@ -143,9 +149,17 @@ public class TestSamzaTaskManager {
 
     AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
     assertEquals(ContainerAllocator.class, allocator.getClass());
+    // Asserts that samza exposed container configs is honored by allocator thread
+    assertEquals(500, allocator.containerMaxMemoryMb);
+    assertEquals(5, allocator.containerMaxCpuCore);
+
+    conf.clear();
+    conf.putAll(getConfigWithHostAffinity());
+    conf.put("yarn.container.memory.mb", "500");
+    conf.put("yarn.container.cpu.cores", "5");
 
     taskManager = new SamzaTaskManager(
-        getConfigWithHostAffinity(),
+        new MapConfig(conf),
         state,
         amRmClientAsync,
         new YarnConfiguration()
@@ -153,12 +167,21 @@ public class TestSamzaTaskManager {
 
     allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
     assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
+    // Asserts that samza exposed container configs is honored by allocator thread
+    assertEquals(500, allocator.containerMaxMemoryMb);
+    assertEquals(5, allocator.containerMaxCpuCore);
+  }
+
+  @Test
+  public void testContainerConfigsAreHonoredInAllocator() {
+
   }
 
   @Test
   public void testOnInit() throws Exception {
+    Config conf = getConfig();
     SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
+        conf,
         state,
         amRmClientAsync,
         new YarnConfiguration()
@@ -167,7 +190,7 @@ public class TestSamzaTaskManager {
     MockContainerAllocator allocator = new MockContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(getConfig(), state),
-        1);
+        new YarnConfig(conf));
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() {
@@ -238,8 +261,9 @@ public class TestSamzaTaskManager {
    */
   @Test
   public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
+    Config conf = getConfig();
     SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
+        conf,
         state,
         amRmClientAsync,
         new YarnConfiguration()
@@ -247,7 +271,7 @@ public class TestSamzaTaskManager {
     MockContainerAllocator allocator = new MockContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(getConfig(), state),
-        1);
+        new YarnConfig(conf));
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     Thread thread = new Thread(allocator);
@@ -303,8 +327,9 @@ public class TestSamzaTaskManager {
    */
   @Test
   public void testSameContainerRequestedOnFailureWithUnknownCode() throws Exception {
+    Config conf = getConfigWithHostAffinity();
     SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfigWithHostAffinity(),
+        conf,
         state,
         amRmClientAsync,
         new YarnConfiguration()
@@ -312,7 +337,7 @@ public class TestSamzaTaskManager {
     MockContainerAllocator allocator = new MockContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(getConfig(), state),
-        1);
+        new YarnConfig(conf));
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     Thread thread = new Thread(allocator);
@@ -381,7 +406,7 @@ public class TestSamzaTaskManager {
     MockContainerAllocator allocator = new MockContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(getConfig(), state),
-        1);
+        new YarnConfig(new MapConfig(config)));
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     Thread thread = new Thread(allocator);

http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
index 85f871a..5fcad82 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
@@ -20,9 +20,8 @@ package org.apache.samza.job.yarn.util;
 
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.job.yarn.AbstractContainerAllocator;
+import org.apache.samza.config.YarnConfig;
 import org.apache.samza.job.yarn.ContainerAllocator;
-import org.apache.samza.job.yarn.ContainerRequestState;
 import org.apache.samza.job.yarn.ContainerUtil;
 
 import java.util.Map;
@@ -32,8 +31,8 @@ public class MockContainerAllocator extends ContainerAllocator {
 
   public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync,
                                 ContainerUtil containerUtil,
-                                int allocatorSleepTime) {
-    super(amrmClientAsync, containerUtil, allocatorSleepTime);
+                                YarnConfig yarnConfig) {
+    super(amrmClientAsync, containerUtil, yarnConfig);
   }
 
   @Override