You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:21 UTC
[05/50] [abbrv] samza git commit: SAMZA-792: SamzaAppMaster Java code
needs to pass the requested container memory size to RM
SAMZA-792: SamzaAppMaster Java code needs to pass the requested container memory size to RM
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b14da282
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b14da282
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b14da282
Branch: refs/heads/samza-sql
Commit: b14da282cd92d28252a77adb80aa038ba0a66bc9
Parents: dfdc35e
Author: Navina <na...@gmail.com>
Authored: Thu Oct 8 16:16:26 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Oct 8 16:16:26 2015 -0700
----------------------------------------------------------------------
.../job/yarn/AbstractContainerAllocator.java | 23 +++++++----
.../samza/job/yarn/ContainerAllocator.java | 7 ++--
.../job/yarn/HostAwareContainerAllocator.java | 10 ++---
.../samza/job/yarn/SamzaContainerRequest.java | 1 +
.../apache/samza/job/yarn/SamzaTaskManager.java | 6 +--
.../samza/job/yarn/TestContainerAllocator.java | 5 ++-
.../yarn/TestHostAwareContainerAllocator.java | 8 ++--
.../samza/job/yarn/TestSamzaTaskManager.java | 43 ++++++++++++++++----
.../job/yarn/util/MockContainerAllocator.java | 7 ++--
9 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index eec1708..6edd477 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -21,11 +21,7 @@ package org.apache.samza.job.yarn;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
+import org.apache.samza.config.YarnConfig;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,6 +42,8 @@ public abstract class AbstractContainerAllocator implements Runnable {
protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
protected final int ALLOCATOR_SLEEP_TIME;
protected final ContainerUtil containerUtil;
+ protected final int containerMaxMemoryMb;
+ protected final int containerMaxCpuCore;
@Override
public abstract void run();
@@ -58,12 +56,14 @@ public abstract class AbstractContainerAllocator implements Runnable {
public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
ContainerUtil containerUtil,
- int allocatorSleepTime,
- ContainerRequestState containerRequestState) {
+ ContainerRequestState containerRequestState,
+ YarnConfig yarnConfig) {
this.amClient = amClient;
this.containerUtil = containerUtil;
- this.ALLOCATOR_SLEEP_TIME = allocatorSleepTime;
+ this.ALLOCATOR_SLEEP_TIME = yarnConfig.getAllocatorSleepTime();
this.containerRequestState = containerRequestState;
+ this.containerMaxMemoryMb = yarnConfig.getContainerMaxMemoryMb();
+ this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
}
@@ -93,7 +93,12 @@ public abstract class AbstractContainerAllocator implements Runnable {
* @param preferredHost Name of the host that you prefer to run the container on
*/
public final void requestContainer(int expectedContainerId, String preferredHost) {
- SamzaContainerRequest request = new SamzaContainerRequest(expectedContainerId, preferredHost);
+ SamzaContainerRequest request = new SamzaContainerRequest(
+ containerMaxMemoryMb,
+ containerMaxCpuCore,
+ DEFAULT_PRIORITY,
+ expectedContainerId,
+ preferredHost);
containerRequestState.updateRequestState(request);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
index 9911540..7c57a86 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -21,11 +21,10 @@ package org.apache.samza.job.yarn;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.List;
-import java.util.Map;
/**
* This is the default allocator thread that will be used by SamzaTaskManager.
@@ -38,8 +37,8 @@ public class ContainerAllocator extends AbstractContainerAllocator {
public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
ContainerUtil containerUtil,
- int allocatorSleepTime) {
- super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, false));
+ YarnConfig yarnConfig) {
+ super(amClient, containerUtil, new ContainerRequestState(amClient, false), yarnConfig);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index e3b5868..ad1587d 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -21,11 +21,10 @@ package org.apache.samza.job.yarn;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.List;
-import java.util.Map;
/**
* This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -43,10 +42,9 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
ContainerUtil containerUtil,
- int allocatorSleepTime,
- int containerRequestTimeout) {
- super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, true));
- this.CONTAINER_REQUEST_TIMEOUT = containerRequestTimeout;
+ YarnConfig yarnConfig) {
+ super(amClient, containerUtil, new ContainerRequestState(amClient, true), yarnConfig);
+ this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout();
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
index 9441d77..a84e53f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
@@ -63,6 +63,7 @@ public class SamzaContainerRequest implements Comparable<SamzaContainerRequest>
this.requestTimestamp = System.currentTimeMillis();
}
+ // Convenience class for unit testing
public SamzaContainerRequest(int expectedContainerId, String preferredHost) {
this(
AbstractContainerAllocator.DEFAULT_CONTAINER_MEM,
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
index 12f2f2c..d17ffe0 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
@@ -32,7 +32,6 @@ import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.HashMap;
import java.util.Map;
@@ -84,14 +83,13 @@ class SamzaTaskManager implements YarnAppMasterListener {
this.containerAllocator = new HostAwareContainerAllocator(
amClient,
new ContainerUtil(config, state, conf),
- yarnConfig.getAllocatorSleepTime(),
- yarnConfig.getContainerRequestTimeout()
+ yarnConfig
);
} else {
this.containerAllocator = new ContainerAllocator(
amClient,
new ContainerUtil(config, state, conf),
- yarnConfig.getAllocatorSleepTime());
+ yarnConfig);
}
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 01f32a4..b20e351 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.server.HttpServer;
@@ -53,7 +54,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestContainerAllocator {
- private final int ALLOCATOR_SLEEP_TIME = 10;
private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
@@ -74,6 +74,7 @@ public class TestContainerAllocator {
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
put("yarn.container.retry.count", "1");
put("yarn.container.retry.window.ms", "1999999999");
+ put("yarn.allocator.sleep.ms", "10");
}
});
@@ -107,7 +108,7 @@ public class TestContainerAllocator {
containerAllocator = new ContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(config, state),
- ALLOCATOR_SLEEP_TIME
+ new YarnConfig(config)
);
Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
requestStateField.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 663ea25..08e53aa 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.server.HttpServer;
@@ -54,8 +55,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestHostAwareContainerAllocator {
- private static final int ALLOCATOR_SLEEP_TIME = 1;
- private static final int CONTAINER_REQUEST_TIMEOUT = 3;
private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
@@ -79,6 +78,8 @@ public class TestHostAwareContainerAllocator {
put("yarn.container.retry.count", "1");
put("yarn.container.retry.window.ms", "1999999999");
put("yarn.samza.host-affinity.enabled", "true");
+ put("yarn.container.request.timeout.ms", "3");
+ put("yarn.allocator.sleep.ms", "1");
}
});
@@ -121,8 +122,7 @@ public class TestHostAwareContainerAllocator {
containerAllocator = new HostAwareContainerAllocator(
amRmClientAsync,
containerUtil,
- ALLOCATOR_SLEEP_TIME,
- CONTAINER_REQUEST_TIMEOUT
+ new YarnConfig(config)
);
Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
requestStateField.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index 4c1eaa9..b12ae5c 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.YarnConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinator;
@@ -134,8 +135,13 @@ public class TestSamzaTaskManager {
@Test
public void testSamzaTaskManager() throws Exception {
+ Map<String, String> conf = new HashMap<>();
+ conf.putAll(getConfig());
+ conf.put("yarn.container.memory.mb", "500");
+ conf.put("yarn.container.cpu.cores", "5");
+
SamzaTaskManager taskManager = new SamzaTaskManager(
- getConfig(),
+ new MapConfig(conf),
state,
amRmClientAsync,
new YarnConfiguration()
@@ -143,9 +149,17 @@ public class TestSamzaTaskManager {
AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
assertEquals(ContainerAllocator.class, allocator.getClass());
+ // Asserts that samza exposed container configs is honored by allocator thread
+ assertEquals(500, allocator.containerMaxMemoryMb);
+ assertEquals(5, allocator.containerMaxCpuCore);
+
+ conf.clear();
+ conf.putAll(getConfigWithHostAffinity());
+ conf.put("yarn.container.memory.mb", "500");
+ conf.put("yarn.container.cpu.cores", "5");
taskManager = new SamzaTaskManager(
- getConfigWithHostAffinity(),
+ new MapConfig(conf),
state,
amRmClientAsync,
new YarnConfiguration()
@@ -153,12 +167,21 @@ public class TestSamzaTaskManager {
allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
+ // Asserts that samza exposed container configs is honored by allocator thread
+ assertEquals(500, allocator.containerMaxMemoryMb);
+ assertEquals(5, allocator.containerMaxCpuCore);
+ }
+
+ @Test
+ public void testContainerConfigsAreHonoredInAllocator() {
+
}
@Test
public void testOnInit() throws Exception {
+ Config conf = getConfig();
SamzaTaskManager taskManager = new SamzaTaskManager(
- getConfig(),
+ conf,
state,
amRmClientAsync,
new YarnConfiguration()
@@ -167,7 +190,7 @@ public class TestSamzaTaskManager {
MockContainerAllocator allocator = new MockContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(getConfig(), state),
- 1);
+ new YarnConfig(conf));
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() {
@@ -238,8 +261,9 @@ public class TestSamzaTaskManager {
*/
@Test
public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
+ Config conf = getConfig();
SamzaTaskManager taskManager = new SamzaTaskManager(
- getConfig(),
+ conf,
state,
amRmClientAsync,
new YarnConfiguration()
@@ -247,7 +271,7 @@ public class TestSamzaTaskManager {
MockContainerAllocator allocator = new MockContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(getConfig(), state),
- 1);
+ new YarnConfig(conf));
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
Thread thread = new Thread(allocator);
@@ -303,8 +327,9 @@ public class TestSamzaTaskManager {
*/
@Test
public void testSameContainerRequestedOnFailureWithUnknownCode() throws Exception {
+ Config conf = getConfigWithHostAffinity();
SamzaTaskManager taskManager = new SamzaTaskManager(
- getConfigWithHostAffinity(),
+ conf,
state,
amRmClientAsync,
new YarnConfiguration()
@@ -312,7 +337,7 @@ public class TestSamzaTaskManager {
MockContainerAllocator allocator = new MockContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(getConfig(), state),
- 1);
+ new YarnConfig(conf));
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
Thread thread = new Thread(allocator);
@@ -381,7 +406,7 @@ public class TestSamzaTaskManager {
MockContainerAllocator allocator = new MockContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(getConfig(), state),
- 1);
+ new YarnConfig(new MapConfig(config)));
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
Thread thread = new Thread(allocator);
http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
index 85f871a..5fcad82 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
@@ -20,9 +20,8 @@ package org.apache.samza.job.yarn.util;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.job.yarn.AbstractContainerAllocator;
+import org.apache.samza.config.YarnConfig;
import org.apache.samza.job.yarn.ContainerAllocator;
-import org.apache.samza.job.yarn.ContainerRequestState;
import org.apache.samza.job.yarn.ContainerUtil;
import java.util.Map;
@@ -32,8 +31,8 @@ public class MockContainerAllocator extends ContainerAllocator {
public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync,
ContainerUtil containerUtil,
- int allocatorSleepTime) {
- super(amrmClientAsync, containerUtil, allocatorSleepTime);
+ YarnConfig yarnConfig) {
+ super(amrmClientAsync, containerUtil, yarnConfig);
}
@Override