You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/11/04 22:37:05 UTC

[2/5] incubator-slider git commit: SLIDER-82 setting up node listings into AppState binding

SLIDER-82 setting up node listings into AppState binding


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/c6231fe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/c6231fe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/c6231fe7

Branch: refs/heads/feature/SLIDER-82-pass-3
Commit: c6231fe769190d6da21c77e6c74298b08f49319b
Parents: da24357
Author: Steve Loughran <st...@apache.org>
Authored: Wed Nov 4 18:11:28 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Nov 4 18:11:28 2015 +0000

----------------------------------------------------------------------
 .../slider/client/SliderYarnClientImpl.java     | 18 +++++++++--
 .../server/appmaster/SliderAppMaster.java       | 34 ++++++++++++--------
 .../appmaster/state/AppStateBindingInfo.java    |  3 ++
 .../slider/agent/rest/TestStandaloneREST.groovy |  4 +--
 .../appmaster/model/mock/MockYarnCluster.groovy | 11 +++----
 5 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c6231fe7/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
index 42759fd..803ccd6 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.slider.client;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -42,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -64,6 +67,17 @@ public class SliderYarnClientImpl extends YarnClientImpl {
    */
   public static final String KILL_ALL = "all";
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    String addr = conf.get(YarnConfiguration.RM_ADDRESS);
+    if (addr.startsWith("0.0.0.0")) {
+      // address isn't known; fail fast
+      throw new BindException("Invalid " + YarnConfiguration.RM_ADDRESS + " value:" + addr
+          + " - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort");
+    }
+    super.serviceInit(conf);
+  }
+
   /**
    * Get the RM Client RPC interface
    * @return an RPC interface valid after initialization and authentication
@@ -107,7 +121,6 @@ public class SliderYarnClientImpl extends YarnClientImpl {
     return results;
   }
 
-
   /**
    * find all instances of a specific app -if there is more than one in the
    * YARN cluster,
@@ -141,8 +154,7 @@ public class SliderYarnClientImpl extends YarnClientImpl {
   public boolean isApplicationLive(ApplicationReport app) {
     Preconditions.checkArgument(app != null, "Null app report");
 
-    return app.getYarnApplicationState().ordinal() <=
-           YarnApplicationState.RUNNING.ordinal();
+    return app.getYarnApplicationState().ordinal() <= YarnApplicationState.RUNNING.ordinal();
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c6231fe7/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index b552290..e6a5bd5 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
@@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
@@ -173,6 +172,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
@@ -215,8 +215,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
    */
   protected static final Logger LOG_YARN = log;
 
-  public static final String SERVICE_CLASSNAME_SHORT =
-      "SliderAppMaster";
+  public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster";
   public static final String SERVICE_CLASSNAME =
       "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT;
 
@@ -495,8 +494,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     metrics.registerAll(new GarbageCollectorMetricSet());
 
 */
-    contentCache = ApplicationResouceContentCacheFactory.createContentCache(
-        stateForProviders);
+    contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders);
 
     executorService = new WorkflowExecutorService<>("AmExecutor",
         Executors.newFixedThreadPool(2,
@@ -504,6 +502,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     addService(executorService);
 
     addService(actionQueues);
+    // set up the YARN client. This may require patching in the RM client-API address if it
+    // is (somehow) unset server-side.
+    String clientRMaddr = conf.get(YarnConfiguration.RM_ADDRESS);
+    if (clientRMaddr.startsWith("0.0.0.0")) {
+      // address isn't known; fail fast
+      throw new BindException("Invalid " + YarnConfiguration.RM_ADDRESS + " value:" + addr
+          + " - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort");
+    }
     addService(yarnClient = new SliderYarnClientImpl());
 
     //init all child services
@@ -564,8 +570,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
 
     //dump the system properties if in debug mode
     if (log.isDebugEnabled()) {
-      log.debug("System properties:\n" +
-                SliderUtils.propertiesToString(System.getProperties()));
+      log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties()));
     }
 
     //choose the action
@@ -634,8 +639,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     // obtain security state
     securityEnabled = securityConfiguration.isSecurityEnabled();
     // set the global security flag for the instance definition
-    instanceDefinition.getAppConfOperations().set(
-        KEY_SECURITY_ENABLED, securityEnabled);
+    instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);
 
     // triggers resolution and snapshotting for agent
     appState.setInitialInstanceDefinition(instanceDefinition);
@@ -653,8 +657,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       InternalKeys.INTERNAL_PROVIDER_NAME);
     log.info("Cluster provider type is {}", providerType);
     SliderProviderFactory factory =
-      SliderProviderFactory.createSliderProviderFactory(
-          providerType);
+      SliderProviderFactory.createSliderProviderFactory(providerType);
     providerService = factory.createServerProvider();
     // init the provider BUT DO NOT START IT YET
     initAndAddService(providerService);
@@ -673,8 +676,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
      * turned into an (incompete) container
      */
     appMasterContainerID = ConverterUtils.toContainerId(
-      SliderUtils.mandatoryEnvVariable(
-          ApplicationConstants.Environment.CONTAINER_ID.name()));
+      SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
     appAttemptID = appMasterContainerID.getApplicationAttemptId();
 
     ApplicationId appid = appAttemptID.getApplicationId();
@@ -687,6 +689,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     Map<String, String> envVars;
     List<Container> liveContainers;
 
+    List<NodeReport> nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+    log.info("Yarn node report count: {}", nodeReports.size());
+
     /*
      * It is critical this section is synchronized, to stop async AM events
      * arriving while registering a restarting AM.
@@ -844,6 +849,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       binding.liveContainers = liveContainers;
       binding.applicationInfo = appInformation;
       binding.releaseSelector = providerService.createContainerReleaseSelector();
+      binding.nodeReports = nodeReports;
       appState.buildInstance(binding);
 
       providerService.rebuildContainerDetails(liveContainers,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c6231fe7/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
index 184c8aa..a2a0b60 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.slider.core.conf.AggregateConf;
 import org.apache.slider.providers.ProviderRole;
 
@@ -46,6 +47,8 @@ public class AppStateBindingInfo {
   public List<Container> liveContainers = new ArrayList<>(0);
   public Map<String, String> applicationInfo = new HashMap<>();
   public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector();
+  /** node reports off the RM. If null, the app state needs to be given a node update later */
+  public List<NodeReport> nodeReports = new ArrayList<>(0);
 
   public void validate() throws IllegalArgumentException {
     Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition");

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c6231fe7/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy
index 97b3009..29fa51a 100644
--- a/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy
@@ -41,7 +41,6 @@ import static org.apache.slider.server.appmaster.web.rest.RestPaths.*
 @Slf4j
 class TestStandaloneREST extends AgentMiniClusterTestBase {
 
-
   @Test
   public void testStandaloneREST() throws Throwable {
 
@@ -65,8 +64,7 @@ class TestStandaloneREST extends AgentMiniClusterTestBase {
     ApplicationReport report = waitForClusterLive(client)
     def proxyAM = report.trackingUrl
     def directAM = report.originalTrackingUrl
-    
-    
+
     // set up url config to match
     initHttpTestSupport(launcher.configuration)
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c6231fe7/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy
index 6056e3a..99a9213 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy
@@ -147,14 +147,15 @@ public class MockYarnCluster {
   
 /**
  * Model cluster nodes on the simpler "slot" model than the YARN-era
- * resource allocation model. Why? Makes it easier to implement.
+ * resource allocation model. Why? Easier to implement scheduling.
+ * Of course, if someone does want to implement the full process...
  *
- * When a cluster is offline, 
  */
   public static class MockYarnClusterNode {
 
     public final int nodeIndex
     public final String hostname;
+    public List<String> labels = []
     public final MockNodeId nodeId;
     public final MockYarnClusterContainer[] containers;
     private boolean offline;
@@ -230,8 +231,6 @@ public class MockYarnCluster {
       }
       return result
     }
-    
-    
 
     /**
      * Release a container
@@ -291,8 +290,8 @@ public class MockYarnCluster {
     return (hostIndex << 8) | containerIndex & 0xff;
   }
 
-  public static final int extractHost(int cid) {
-    return (cid >>> 8);
+  public static final int extractHost(long cid) {
+    return (cid >>> 8) & 0xffff;
   }
 
   public static final int extractContainer(int cid) {