You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/10/03 00:26:01 UTC

[4/7] git commit: SLIDER-476 make AM queue executor non-reentrant

SLIDER-476 make AM queue executor non-reentrant


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/755bb7ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/755bb7ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/755bb7ce

Branch: refs/heads/develop
Commit: 755bb7ce951a97fbbbca420429b6a254576d2b38
Parents: ddde030
Author: Steve Loughran <st...@apache.org>
Authored: Thu Oct 2 14:24:46 2014 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Oct 2 14:24:46 2014 -0700

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       | 49 +++++++++++++-------
 .../appmaster/actions/ActionStopQueue.java      |  8 +++-
 .../server/appmaster/actions/AsyncAction.java   |  2 +-
 .../server/appmaster/actions/QueueExecutor.java |  2 +
 4 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 24d5eea..cc4f6fe 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -409,7 +409,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     SliderUtils.validateSliderServerEnvironment(log);
 
     executorService = new WorkflowExecutorService<ExecutorService>("AmExecutor",
-        Executors.newSingleThreadExecutor(
+        Executors.newCachedThreadPool(
         new ServiceThreadFactory("AmExecutor", true)));
     addService(executorService);
 
@@ -1242,18 +1242,24 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
    * It should be the only way that anything -even the AM itself on startup-
    * asks for nodes. 
    * @param resources the resource tree
-   * @throws IOException
+   * @throws SliderException slider problems, including invalid configs
+   * @throws IOException IO problems
    */
   private void flexCluster(ConfTree resources)
-    throws IOException, SliderInternalStateException, BadConfigException {
+      throws IOException, SliderException {
+
+    AggregateConf newConf =
+        new AggregateConf(appState.getInstanceDefinitionSnapshot());
+    newConf.setResources(resources);
+    // verify the new definition is valid
+    sliderAMProvider.validateInstanceDefinition(newConf);
+    providerService.validateInstanceDefinition(newConf);
 
     appState.updateResourceDefinitions(resources);
 
     // reset the scheduled windows...the values
     // may have changed
     appState.resetFailureCounts();
-    
-
 
     // ask for more containers if needed
     reviewRequestAndReleaseNodes("flexCluster");
@@ -1416,11 +1422,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
 /* SliderClusterProtocol */
 /* =================================================================== */
 
+  /**
+   * General actions to perform on a slider RPC call coming in
+   * @param operation operation to log
+   * @throws IOException problems
+   */
+  protected void onRpcCall(String operation) throws IOException {
+    // it's not clear why this is here —it has been present since the
+    // code -> git change. Leaving it in
+    SliderUtils.getCurrentUser();
+    log.debug("Received call to {}", operation);
+  }
+
   @Override //SliderClusterProtocol
   public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws
                                                                                                  IOException,
                                                                                                  YarnException {
-    SliderUtils.getCurrentUser();
+    onRpcCall("stopCluster()");
     String message = request.getMessage();
     log.info("SliderAppMasterApi.stopCluster: {}", message);
     schedule(new ActionStopSlider(message, 1000, TimeUnit.MILLISECONDS));
@@ -1431,8 +1449,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) throws
                                                                                                  IOException,
                                                                                                  YarnException {
-    SliderUtils.getCurrentUser();
-
+    onRpcCall("flexCluster()");
     String payload = request.getClusterSpec();
     ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
     ConfTree updatedResources = confTreeSerDeser.fromJson(payload);
@@ -1445,7 +1462,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     Messages.GetJSONClusterStatusRequestProto request) throws
                                                        IOException,
                                                        YarnException {
-    SliderUtils.getCurrentUser();
+    onRpcCall("getJSONClusterStatus()");
     String result;
     //quick update
     //query and json-ify
@@ -1457,14 +1474,13 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       .build();
   }
 
-
   @Override
   public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
     Messages.GetInstanceDefinitionRequestProto request) throws
                                                         IOException,
                                                         YarnException {
 
-    log.info("Received call to getInstanceDefinition()");
+    onRpcCall("getInstanceDefinition()");
     String internal;
     String resources;
     String app;
@@ -1477,7 +1493,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     assert internal != null;
     assert resources != null;
     assert app != null;
-    log.info("Generating getInstanceDefinition Response");
+    log.debug("Generating getInstanceDefinition Response");
     Messages.GetInstanceDefinitionResponseProto.Builder builder =
       Messages.GetInstanceDefinitionResponseProto.newBuilder();
     builder.setInternal(internal);
@@ -1486,12 +1502,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     return builder.build();
   }
 
-
   @Override //SliderClusterProtocol
   public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws
                                                                                                                          IOException,
                                                                                                                          YarnException {
-    SliderUtils.getCurrentUser();
+    onRpcCall("listNodeUUIDsByRole()");
     String role = request.getRole();
     Messages.ListNodeUUIDsByRoleResponseProto.Builder builder =
       Messages.ListNodeUUIDsByRoleResponseProto.newBuilder();
@@ -1506,7 +1521,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) throws
                                                                                      IOException,
                                                                                      YarnException {
-    SliderUtils.getCurrentUser();
+    onRpcCall("getNode()");
     RoleInstance instance = appState.getLiveInstanceByContainerID(
       request.getUuid());
     return Messages.GetNodeResponseProto.newBuilder()
@@ -1518,7 +1533,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   public Messages.GetClusterNodesResponseProto getClusterNodes(Messages.GetClusterNodesRequestProto request) throws
                                                                                                              IOException,
                                                                                                              YarnException {
-    SliderUtils.getCurrentUser();
+    onRpcCall("getClusterNodes()");
     List<RoleInstance>
       clusterNodes = appState.getLiveInstancesByContainerIDs(
       request.getUuidList());
@@ -1536,6 +1551,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) throws
                                                                             IOException,
                                                                             YarnException {
+    onRpcCall("echo()");
     Messages.EchoResponseProto.Builder builder =
       Messages.EchoResponseProto.newBuilder();
     String text = request.getText();
@@ -1550,6 +1566,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) throws
                                                                                                        IOException,
                                                                                                        YarnException {
+    onRpcCall("killContainer()");
     String containerID = request.getId();
     log.info("Kill Container {}", containerID);
     //throws NoSuchNodeException if it is missing

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
index 66a3961..08e8086 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
@@ -20,6 +20,8 @@ package org.apache.slider.server.appmaster.actions;
 
 import org.apache.slider.server.appmaster.SliderAppMaster;
 import org.apache.slider.server.appmaster.state.AppState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
@@ -27,7 +29,9 @@ import java.util.concurrent.TimeUnit;
  * Action to tell a queue executor to stop -after handing this on/executing it
  */
 public class ActionStopQueue extends AsyncAction {
-
+  private static final Logger log =
+      LoggerFactory.getLogger(ActionStopQueue.class);
+  
   public ActionStopQueue(long delay) {
     super("stop queue", delay);
   }
@@ -47,6 +51,6 @@ public class ActionStopQueue extends AsyncAction {
   public void execute(SliderAppMaster appMaster,
       QueueAccess queueService,
       AppState appState) throws Exception {
-    // no-op
+    log.warn("STOP");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
index c8db42d..f9a1fd5 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
@@ -93,7 +93,7 @@ public abstract class AsyncAction implements Delayed {
     final StringBuilder sb =
         new StringBuilder(super.toString());
     sb.append(" name='").append(name).append('\'');
-    sb.append(", nanos=").append(getNanos());
+    sb.append(", delay=").append(getDelay(TimeUnit.SECONDS));
     sb.append(", attrs=").append(attrs);
     sb.append(", sequenceNumber=").append(sequenceNumber);
     sb.append('}');

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
index 87956db..a40b0f3 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
@@ -68,6 +68,8 @@ public class QueueExecutor implements Runnable {
         log.debug("Executing {}", take);
         
         take.execute(appMaster, actionQueues, appState);
+        log.debug("Completed {}", take);
+
       } while (!(take instanceof ActionStopQueue));
       log.info("Queue Executor run() stopped");
     } catch (Exception e) {