You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/10/06 04:57:04 UTC
[07/24] git commit: SLIDER-476 make AM queue executor non-reentrant
SLIDER-476 make AM queue executor non-reentrant
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/755bb7ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/755bb7ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/755bb7ce
Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: 755bb7ce951a97fbbbca420429b6a254576d2b38
Parents: ddde030
Author: Steve Loughran <st...@apache.org>
Authored: Thu Oct 2 14:24:46 2014 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Oct 2 14:24:46 2014 -0700
----------------------------------------------------------------------
.../server/appmaster/SliderAppMaster.java | 49 +++++++++++++-------
.../appmaster/actions/ActionStopQueue.java | 8 +++-
.../server/appmaster/actions/AsyncAction.java | 2 +-
.../server/appmaster/actions/QueueExecutor.java | 2 +
4 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 24d5eea..cc4f6fe 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -409,7 +409,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
SliderUtils.validateSliderServerEnvironment(log);
executorService = new WorkflowExecutorService<ExecutorService>("AmExecutor",
- Executors.newSingleThreadExecutor(
+ Executors.newCachedThreadPool(
new ServiceThreadFactory("AmExecutor", true)));
addService(executorService);
@@ -1242,18 +1242,24 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* It should be the only way that anything -even the AM itself on startup-
* asks for nodes.
* @param resources the resource tree
- * @throws IOException
+ * @throws SliderException slider problems, including invalid configs
+ * @throws IOException IO problems
*/
private void flexCluster(ConfTree resources)
- throws IOException, SliderInternalStateException, BadConfigException {
+ throws IOException, SliderException {
+
+ AggregateConf newConf =
+ new AggregateConf(appState.getInstanceDefinitionSnapshot());
+ newConf.setResources(resources);
+ // verify the new definition is valid
+ sliderAMProvider.validateInstanceDefinition(newConf);
+ providerService.validateInstanceDefinition(newConf);
appState.updateResourceDefinitions(resources);
// reset the scheduled windows...the values
// may have changed
appState.resetFailureCounts();
-
-
// ask for more containers if needed
reviewRequestAndReleaseNodes("flexCluster");
@@ -1416,11 +1422,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/* SliderClusterProtocol */
/* =================================================================== */
+ /**
+ * General actions to perform on a slider RPC call coming in
+ * @param operation operation to log
+ * @throws IOException problems
+ */
+ protected void onRpcCall(String operation) throws IOException {
+ // it's not clear why this is here —it has been present since the
+ // code -> git change. Leaving it in
+ SliderUtils.getCurrentUser();
+ log.debug("Received call to {}", operation);
+ }
+
@Override //SliderClusterProtocol
public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
+ onRpcCall("stopCluster()");
String message = request.getMessage();
log.info("SliderAppMasterApi.stopCluster: {}", message);
schedule(new ActionStopSlider(message, 1000, TimeUnit.MILLISECONDS));
@@ -1431,8 +1449,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
-
+ onRpcCall("flexCluster()");
String payload = request.getClusterSpec();
ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
ConfTree updatedResources = confTreeSerDeser.fromJson(payload);
@@ -1445,7 +1462,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
Messages.GetJSONClusterStatusRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
+ onRpcCall("getJSONClusterStatus()");
String result;
//quick update
//query and json-ify
@@ -1457,14 +1474,13 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
.build();
}
-
@Override
public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
Messages.GetInstanceDefinitionRequestProto request) throws
IOException,
YarnException {
- log.info("Received call to getInstanceDefinition()");
+ onRpcCall("getInstanceDefinition()");
String internal;
String resources;
String app;
@@ -1477,7 +1493,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
assert internal != null;
assert resources != null;
assert app != null;
- log.info("Generating getInstanceDefinition Response");
+ log.debug("Generating getInstanceDefinition Response");
Messages.GetInstanceDefinitionResponseProto.Builder builder =
Messages.GetInstanceDefinitionResponseProto.newBuilder();
builder.setInternal(internal);
@@ -1486,12 +1502,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
return builder.build();
}
-
@Override //SliderClusterProtocol
public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
+ onRpcCall("listNodeUUIDsByRole()");
String role = request.getRole();
Messages.ListNodeUUIDsByRoleResponseProto.Builder builder =
Messages.ListNodeUUIDsByRoleResponseProto.newBuilder();
@@ -1506,7 +1521,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
+ onRpcCall("getNode()");
RoleInstance instance = appState.getLiveInstanceByContainerID(
request.getUuid());
return Messages.GetNodeResponseProto.newBuilder()
@@ -1518,7 +1533,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public Messages.GetClusterNodesResponseProto getClusterNodes(Messages.GetClusterNodesRequestProto request) throws
IOException,
YarnException {
- SliderUtils.getCurrentUser();
+ onRpcCall("getClusterNodes()");
List<RoleInstance>
clusterNodes = appState.getLiveInstancesByContainerIDs(
request.getUuidList());
@@ -1536,6 +1551,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) throws
IOException,
YarnException {
+ onRpcCall("echo()");
Messages.EchoResponseProto.Builder builder =
Messages.EchoResponseProto.newBuilder();
String text = request.getText();
@@ -1550,6 +1566,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) throws
IOException,
YarnException {
+ onRpcCall("killContainer()");
String containerID = request.getId();
log.info("Kill Container {}", containerID);
//throws NoSuchNodeException if it is missing
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
index 66a3961..08e8086 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
@@ -20,6 +20,8 @@ package org.apache.slider.server.appmaster.actions;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -27,7 +29,9 @@ import java.util.concurrent.TimeUnit;
* Action to tell a queue executor to stop -after handing this on/executing it
*/
public class ActionStopQueue extends AsyncAction {
-
+ private static final Logger log =
+ LoggerFactory.getLogger(ActionStopQueue.class);
+
public ActionStopQueue(long delay) {
super("stop queue", delay);
}
@@ -47,6 +51,6 @@ public class ActionStopQueue extends AsyncAction {
public void execute(SliderAppMaster appMaster,
QueueAccess queueService,
AppState appState) throws Exception {
- // no-op
+ log.warn("STOP");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
index c8db42d..f9a1fd5 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
@@ -93,7 +93,7 @@ public abstract class AsyncAction implements Delayed {
final StringBuilder sb =
new StringBuilder(super.toString());
sb.append(" name='").append(name).append('\'');
- sb.append(", nanos=").append(getNanos());
+ sb.append(", delay=").append(getDelay(TimeUnit.SECONDS));
sb.append(", attrs=").append(attrs);
sb.append(", sequenceNumber=").append(sequenceNumber);
sb.append('}');
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
index 87956db..a40b0f3 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
@@ -68,6 +68,8 @@ public class QueueExecutor implements Runnable {
log.debug("Executing {}", take);
take.execute(appMaster, actionQueues, appState);
+ log.debug("Completed {}", take);
+
} while (!(take instanceof ActionStopQueue));
log.info("Queue Executor run() stopped");
} catch (Exception e) {