You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/02 23:19:26 UTC
samza git commit: SAMZA-1138; Yarn capability check is broken
Repository: samza
Updated Branches:
refs/heads/master 888e06173 -> 101ca439b
SAMZA-1138; Yarn capability check is broken
After migration from `yarn.container.*` properties to `cluster-manager.container.*` properties we have to use either of them and `ClusterManagerConfig` provides backward compatibility for these properties. But in `YarnClusterResourceManage`r only old properties are used (from `YarnConfig`), hence if job config migrated to new `cluster-manager.*` properties names then check will be evaluated against default values, not against actual values.
Author: Maksim Logvinenko <ml...@gmail.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #84 from logarithm/yarn-properties-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/101ca439
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/101ca439
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/101ca439
Branch: refs/heads/master
Commit: 101ca439b8b22926db7423309a3ff880f396271a
Parents: 888e061
Author: Maksim Logvinenko <ml...@gmail.com>
Authored: Sun Apr 2 16:18:43 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Sun Apr 2 16:18:43 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/YarnConfig.java | 41 --------------------
.../job/yarn/YarnClusterResourceManager.java | 10 ++++-
2 files changed, 9 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 86e4ef7..aa4bc3e 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig {
*/
public static final String PACKAGE_PATH = "yarn.package.path";
- // Configs related to each yarn container
- /**
- * Memory, in megabytes, to request from YARN per container
- */
- public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
- private static final int DEFAULT_CONTAINER_MEM = 1024;
-
/**
* Name of YARN queue to run jobs on
*/
public static final String QUEUE_NAME = "yarn.queue";
/**
- * Number of CPU cores to request from YARN per container
- */
- public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
- private static final int DEFAULT_CPU_CORES = 1;
-
- /**
* Label to request from YARN for containers
*/
public static final String CONTAINER_LABEL = "yarn.container.label";
- /**
- * Maximum number of times the AM tries to restart a failed container
- */
- public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count";
- private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
-
- /**
- * Determines how frequently a container is allowed to fail before we give up and fail the job
- */
- public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms";
- private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
-
// Configs related to the Samza Application Master (AM)
/**
* (Optional) JVM options to include in the command line when executing the AM
@@ -150,26 +125,10 @@ public class YarnConfig extends MapConfig {
super(config);
}
- public int getContainerRetryCount() {
- return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
- }
-
- public int getContainerRetryWindowMs() {
- return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS);
- }
-
public int getAMPollIntervalMs() {
return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
}
- public int getContainerMaxMemoryMb() {
- return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
- }
-
- public int getContainerMaxCpuCores() {
- return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
- }
-
public String getContainerLabel() {
return get(CONTAINER_LABEL, null);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 04c78be..1fd3939 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.*;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
@@ -149,7 +150,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);
log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
- this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
+ ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+ this.lifecycle = new SamzaYarnAppMasterLifecycle(
+ clusterManagerConfig.getContainerMemoryMb(),
+ clusterManagerConfig.getNumCores(),
+ samzaAppState,
+ state,
+ amClient
+ );
yarnContainerRunner = new YarnContainerRunner(config, hConfig);
}