You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/02 23:19:26 UTC

samza git commit: SAMZA-1138; Yarn capability check is broken

Repository: samza
Updated Branches:
  refs/heads/master 888e06173 -> 101ca439b


SAMZA-1138; Yarn capability check is broken

After migration from `yarn.container.*` properties to `cluster-manager.container.*` properties we have to use either of them and `ClusterManagerConfig` provides backward compatibility for these properties. But in `YarnClusterResourceManage`r only old properties are used (from `YarnConfig`), hence if job config migrated to new `cluster-manager.*` properties names then check will be evaluated against default values, not against actual values.

Author: Maksim Logvinenko <ml...@gmail.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #84 from logarithm/yarn-properties-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/101ca439
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/101ca439
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/101ca439

Branch: refs/heads/master
Commit: 101ca439b8b22926db7423309a3ff880f396271a
Parents: 888e061
Author: Maksim Logvinenko <ml...@gmail.com>
Authored: Sun Apr 2 16:18:43 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Sun Apr 2 16:18:43 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/YarnConfig.java     | 41 --------------------
 .../job/yarn/YarnClusterResourceManager.java    | 10 ++++-
 2 files changed, 9 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 86e4ef7..aa4bc3e 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig {
    */
   public static final String PACKAGE_PATH = "yarn.package.path";
 
-  // Configs related to each yarn container
-  /**
-   * Memory, in megabytes, to request from YARN per container
-   */
-  public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
-  private static final int DEFAULT_CONTAINER_MEM = 1024;
-
   /**
    * Name of YARN queue to run jobs on
    */
   public static final String QUEUE_NAME = "yarn.queue";
 
   /**
-   * Number of CPU cores to request from YARN per container
-   */
-  public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
-  private static final int DEFAULT_CPU_CORES = 1;
-
-  /**
    * Label to request from YARN for containers
    */
   public static final String CONTAINER_LABEL = "yarn.container.label";
 
-  /**
-   * Maximum number of times the AM tries to restart a failed container
-   */
-  public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count";
-  private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
-
-  /**
-   * Determines how frequently a container is allowed to fail before we give up and fail the job
-   */
-  public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms";
-  private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
-
   // Configs related to the Samza Application Master (AM)
   /**
    * (Optional) JVM options to include in the command line when executing the AM
@@ -150,26 +125,10 @@ public class YarnConfig extends MapConfig {
     super(config);
   }
 
-  public int getContainerRetryCount() {
-    return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
-  }
-
-  public int getContainerRetryWindowMs() {
-    return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS);
-  }
-
   public int getAMPollIntervalMs() {
     return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
   }
 
-  public int getContainerMaxMemoryMb() {
-    return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
-  }
-
-  public int getContainerMaxCpuCores() {
-    return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
-  }
-
   public String getContainerLabel() {
     return get(CONTAINER_LABEL, null);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 04c78be..1fd3939 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.*;
 import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaContainerLaunchException;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
@@ -149,7 +150,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);
 
     log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
-    this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    this.lifecycle = new SamzaYarnAppMasterLifecycle(
+        clusterManagerConfig.getContainerMemoryMb(),
+        clusterManagerConfig.getNumCores(),
+        samzaAppState,
+        state,
+        amClient
+    );
 
     yarnContainerRunner = new YarnContainerRunner(config, hConfig);
   }