You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/07/19 22:35:39 UTC
[47/50] [abbrv] hadoop git commit: YARN-6777. Support for
ApplicationMasterService processing chain of interceptors. (asuresh)
YARN-6777. Support for ApplicationMasterService processing chain of interceptors. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/077fcf6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/077fcf6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/077fcf6a
Branch: refs/heads/HDFS-7240
Commit: 077fcf6a96e420e7f36350931722b8603d010cf1
Parents: 3556e36
Author: Arun Suresh <as...@apache.org>
Authored: Mon Jul 17 17:02:22 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jul 19 12:26:40 2017 -0700
----------------------------------------------------------------------
.../ams/ApplicationMasterServiceContext.java | 29 ++++
.../ams/ApplicationMasterServiceProcessor.java | 30 ++--
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +-
.../src/main/resources/yarn-default.xml | 10 ++
.../resourcemanager/AMSProcessingChain.java | 102 ++++++++++++
.../ApplicationMasterService.java | 49 ++++--
.../resourcemanager/DefaultAMSProcessor.java | 69 ++++----
...pportunisticContainerAllocatorAMService.java | 67 +++++---
.../yarn/server/resourcemanager/RMContext.java | 3 +-
.../TestApplicationMasterService.java | 163 ++++++++++++++++++-
10 files changed, 446 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
new file mode 100644
index 0000000..988c727
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.ams;
+
+/**
+ * This is a marker interface for a context object that is injected into
+ * the ApplicationMasterService processor. The processor implementation
+ * is free to type cast this based on the availability of the context's
+ * implementation in the classpath.
+ */
+public interface ApplicationMasterServiceContext {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
index b426f48..b7d925a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
@@ -38,34 +38,44 @@ import java.io.IOException;
public interface ApplicationMasterServiceProcessor {
/**
+ * Initialize with and ApplicationMasterService Context as well as the
+ * next processor in the chain.
+ * @param amsContext AMSContext.
+ * @param nextProcessor next ApplicationMasterServiceProcessor
+ */
+ void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor);
+
+ /**
* Register AM attempt.
* @param applicationAttemptId applicationAttemptId.
* @param request Register Request.
- * @return Register Response.
+ * @param response Register Response.
* @throws IOException IOException.
*/
- RegisterApplicationMasterResponse registerApplicationMaster(
+ void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
- RegisterApplicationMasterRequest request) throws IOException;
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response) throws IOException;
/**
* Allocate call.
* @param appAttemptId appAttemptId.
* @param request Allocate Request.
- * @return Allocate Response.
+ * @param response Allocate Response.
* @throws YarnException YarnException.
*/
- AllocateResponse allocate(ApplicationAttemptId appAttemptId,
- AllocateRequest request) throws YarnException;
+ void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException;
/**
* Finish AM.
* @param applicationAttemptId applicationAttemptId.
* @param request Finish AM Request.
- * @return Finish AM response.
+ * @param response Finish AM Response.
*/
- FinishApplicationMasterResponse finishApplicationMaster(
+ void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
- FinishApplicationMasterRequest request);
-
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 01eff64..93437e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -103,7 +103,7 @@ public class YarnConfiguration extends Configuration {
YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled";
public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false;
-
+
////////////////////////////////
// IPC Configs
////////////////////////////////
@@ -150,6 +150,9 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT;
+ public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
+ RM_PREFIX + "application-master-service.processors";
+
/** The actual bind address for the RM.*/
public static final String RM_BIND_HOST =
RM_PREFIX + "bind-host";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0588c6c..7ddcfcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -123,6 +123,16 @@
</property>
<property>
+ <description>
+ Comma separated class names of ApplicationMasterServiceProcessor
+ implementations. The processors will be applied in the order
+ they are specified.
+ </description>
+ <name>yarn.resourcemanager.application-master-service.processors</name>
+ <value></value>
+ </property>
+
+ <property>
<description>
This configures the HTTP endpoint for Yarn Daemons.The following
values are supported:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
new file mode 100644
index 0000000..931b1c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.io.IOException;
+
+/**
+ * This maintains a chain of {@link ApplicationMasterServiceProcessor}s.
+ */
+class AMSProcessingChain implements ApplicationMasterServiceProcessor {
+
+ private static final Log LOG = LogFactory.getLog(AMSProcessingChain.class);
+
+ private ApplicationMasterServiceProcessor head;
+ private RMContext rmContext;
+
+ /**
+ * This has to be initialized with at-least 1 Processor.
+ * @param rootProcessor Root processor.
+ */
+ AMSProcessingChain(ApplicationMasterServiceProcessor rootProcessor) {
+ if (rootProcessor == null) {
+ throw new YarnRuntimeException("No root ApplicationMasterService" +
+ "Processor specified for the processing chain..");
+ }
+ this.head = rootProcessor;
+ }
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor) {
+ LOG.info("Initializing AMS Processing chain. Root Processor=["
+ + this.head.getClass().getName() + "].");
+ this.rmContext = (RMContext)amsContext;
+ // The head is initialized with a null 'next' processor
+ this.head.init(amsContext, null);
+ }
+
+ /**
+ * Add an processor to the top of the chain.
+ * @param processor ApplicationMasterServiceProcessor
+ */
+ public synchronized void addProcessor(
+ ApplicationMasterServiceProcessor processor) {
+ LOG.info("Adding [" + processor.getClass().getName() + "] tp top of" +
+ " AMS Processing chain. ");
+ processor.init(this.rmContext, this.head);
+ this.head = processor;
+ }
+
+ @Override
+ public void registerApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse resp) throws IOException {
+ this.head.registerApplicationMaster(applicationAttemptId, request, resp);
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
+ this.head.allocate(appAttemptId, request, response);
+ }
+
+ @Override
+ public void finishApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ this.head.finishApplicationMaster(applicationAttemptId, request, response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index fe8b83c..76a1640 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -88,7 +89,7 @@ public class ApplicationMasterService extends AbstractService implements
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
protected final RMContext rmContext;
- private final ApplicationMasterServiceProcessor amsProcessor;
+ private final AMSProcessingChain amsProcessingChain;
public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) {
@@ -101,11 +102,7 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
- this.amsProcessor = createProcessor();
- }
-
- protected ApplicationMasterServiceProcessor createProcessor() {
- return new DefaultAMSProcessor(rmContext, rScheduler);
+ this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor());
}
@Override
@@ -115,6 +112,21 @@ public class ApplicationMasterService extends AbstractService implements
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ amsProcessingChain.init(rmContext, null);
+ List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
+ if (processors != null) {
+ Collections.reverse(processors);
+ for (ApplicationMasterServiceProcessor p : processors) {
+ this.amsProcessingChain.addProcessor(p);
+ }
+ }
+ }
+
+ protected List<ApplicationMasterServiceProcessor> getProcessorList(
+ Configuration conf) {
+ return conf.getInstances(
+ YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+ ApplicationMasterServiceProcessor.class);
}
@Override
@@ -165,6 +177,10 @@ public class ApplicationMasterService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
}
+ protected AMSProcessingChain getProcessingChain() {
+ return this.amsProcessingChain;
+ }
+
@Private
public InetSocketAddress getBindAddress() {
return this.masterServiceAddress;
@@ -214,8 +230,12 @@ public class ApplicationMasterService extends AbstractService implements
lastResponse.setResponseId(0);
lock.setAllocateResponse(lastResponse);
- return this.amsProcessor.registerApplicationMaster(
- amrmTokenIdentifier.getApplicationAttemptId(), request);
+ RegisterApplicationMasterResponse response =
+ recordFactory.newRecordInstance(
+ RegisterApplicationMasterResponse.class);
+ this.amsProcessingChain.registerApplicationMaster(
+ amrmTokenIdentifier.getApplicationAttemptId(), request, response);
+ return response;
}
}
@@ -265,8 +285,11 @@ public class ApplicationMasterService extends AbstractService implements
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
- return this.amsProcessor.finishApplicationMaster(
- applicationAttemptId, request);
+ FinishApplicationMasterResponse response =
+ FinishApplicationMasterResponse.newInstance(false);
+ this.amsProcessingChain.finishApplicationMaster(
+ applicationAttemptId, request, response);
+ return response;
}
}
@@ -346,8 +369,10 @@ public class ApplicationMasterService extends AbstractService implements
throw new InvalidApplicationMasterRequestException(message);
}
- AllocateResponse response = this.amsProcessor.allocate(
- amrmTokenIdentifier.getApplicationAttemptId(), request);
+ AllocateResponse response =
+ recordFactory.newRecordInstance(AllocateResponse.class);
+ this.amsProcessingChain.allocate(
+ amrmTokenIdentifier.getApplicationAttemptId(), request, response);
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 6eb1fba..052ec22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -81,7 +82,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
+/**
+ * This is the default Application Master Service processor. It has be the
+ * last processor in the @{@link AMSProcessingChain}.
+ */
+final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class);
@@ -93,17 +98,19 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private final RMContext rmContext;
- private final YarnScheduler scheduler;
+ private RMContext rmContext;
- DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) {
- this.rmContext = rmContext;
- this.scheduler = scheduler;
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor) {
+ this.rmContext = (RMContext)amsContext;
}
- public RegisterApplicationMasterResponse registerApplicationMaster(
+ @Override
+ public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
- RegisterApplicationMasterRequest request) throws IOException {
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response) throws IOException {
RMApp app = getRmContext().getRMApps().get(
applicationAttemptId.getApplicationId());
@@ -116,8 +123,6 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
RMAuditLogger.AuditConstants.REGISTER_AM,
"ApplicationMasterService", app.getApplicationId(),
applicationAttemptId);
- RegisterApplicationMasterResponse response = recordFactory
- .newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(getScheduler()
.getMaximumResourceCapability(app.getQueue()));
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
@@ -165,11 +170,11 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
response.setSchedulerResourceTypes(getScheduler()
.getSchedulingResourceTypes());
- return response;
}
- public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
- AllocateRequest request) throws YarnException {
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
handleProgress(appAttemptId, request);
@@ -259,50 +264,46 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
- AllocateResponse allocateResponse =
- recordFactory.newRecordInstance(AllocateResponse.class);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
- allocateResponse.setNMTokens(allocation.getNMTokens());
+ response.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
ApplicationMasterServiceUtils.addToUpdateContainerErrors(
- allocateResponse, updateErrors);
+ response, updateErrors);
// update the response with the deltas of node status changes
- handleNodeUpdates(app, allocateResponse);
+ handleNodeUpdates(app, response);
ApplicationMasterServiceUtils.addToAllocatedContainers(
- allocateResponse, allocation.getContainers());
+ response, allocation.getContainers());
- allocateResponse.setCompletedContainersStatuses(appAttempt
+ response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
- allocateResponse.setAvailableResources(allocation.getResourceLimit());
+ response.setAvailableResources(allocation.getResourceLimit());
- addToContainerUpdates(allocateResponse, allocation,
+ addToContainerUpdates(response, allocation,
((AbstractYarnScheduler)getScheduler())
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
- allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes());
+ response.setNumClusterNodes(getScheduler().getNumClusterNodes());
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(
getRmContext().getYarnConfiguration())) {
- allocateResponse.setCollectorAddr(
+ response.setCollectorAddr(
getRmContext().getRMApps().get(appAttemptId.getApplicationId())
.getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
- allocateResponse
- .setPreemptionMessage(generatePreemptionMessage(allocation));
+ response.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
- allocateResponse.setApplicationPriority(app
+ response.setApplicationPriority(app
.getApplicationPriority());
- return allocateResponse;
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
@@ -351,20 +352,20 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
.getProgress()));
}
- public FinishApplicationMasterResponse finishApplicationMaster(
+ @Override
+ public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
- FinishApplicationMasterRequest request) {
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
RMApp app =
getRmContext().getRMApps().get(applicationAttemptId.getApplicationId());
// For UnmanagedAMs, return true so they don't retry
- FinishApplicationMasterResponse response =
- FinishApplicationMasterResponse.newInstance(
+ response.setIsUnregistered(
app.getApplicationSubmissionContext().getUnmanagedAM());
getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
- return response;
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@@ -424,7 +425,7 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
}
protected YarnScheduler getScheduler() {
- return scheduler;
+ return rmContext.getScheduler();
}
private static void addToContainerUpdates(AllocateResponse allocateResponse,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index e03d944..3c278de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -23,9 +23,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -101,17 +105,29 @@ public class OpportunisticContainerAllocatorAMService
private volatile List<RemoteNode> cachedNodes;
private volatile long lastCacheUpdateTime;
- class OpportunisticAMSProcessor extends DefaultAMSProcessor {
+ class OpportunisticAMSProcessor implements
+ ApplicationMasterServiceProcessor {
- OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler
- scheduler) {
- super(rmContext, scheduler);
+ private ApplicationMasterServiceContext context;
+ private ApplicationMasterServiceProcessor nextProcessor;
+
+ private YarnScheduler getScheduler() {
+ return ((RMContext)context).getScheduler();
}
@Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor next) {
+ this.context = amsContext;
+ // The AMSProcessingChain guarantees that 'next' is not null.
+ this.nextProcessor = next;
+ }
+
+ @Override
+ public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
- RegisterApplicationMasterRequest request) throws IOException {
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response) throws IOException {
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
getScheduler()).getApplicationAttempt(applicationAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) {
@@ -135,12 +151,14 @@ public class OpportunisticContainerAllocatorAMService
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
- return super.registerApplicationMaster(applicationAttemptId, request);
+ nextProcessor.registerApplicationMaster(
+ applicationAttemptId, request, response);
}
@Override
- public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
- AllocateRequest request) throws YarnException {
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response)
+ throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks =
@@ -165,17 +183,22 @@ public class OpportunisticContainerAllocatorAMService
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
+ ApplicationMasterServiceUtils.addToAllocatedContainers(
+ response, oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
+ nextProcessor.allocate(appAttemptId, request, response);
+ }
- AllocateResponse response = super.allocate(appAttemptId, request);
- if (!oppContainers.isEmpty()) {
- ApplicationMasterServiceUtils.addToAllocatedContainers(
- response, oppContainers);
- }
- return response;
+ @Override
+ public void finishApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ nextProcessor.finishApplicationMaster(applicationAttemptId,
+ request, response);
}
}
@@ -237,11 +260,6 @@ public class OpportunisticContainerAllocatorAMService
}
@Override
- protected ApplicationMasterServiceProcessor createProcessor() {
- return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler());
- }
-
- @Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
@@ -262,6 +280,15 @@ public class OpportunisticContainerAllocatorAMService
}
@Override
+ protected List<ApplicationMasterServiceProcessor> getProcessorList(
+ Configuration conf) {
+ List<ApplicationMasterServiceProcessor> retVal =
+ super.getProcessorList(conf);
+ retVal.add(new OpportunisticAMSProcessor());
+ return retVal;
+ }
+
+ @Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ba6b915..0ea9516 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
@@ -53,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC
/**
* Context of the ResourceManager.
*/
-public interface RMContext {
+public interface RMContext extends ApplicationMasterServiceContext {
Dispatcher getDispatcher();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 18c49bd..85a36e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -20,20 +20,29 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import static java.lang.Thread.sleep;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
@@ -44,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -61,7 +71,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
public class TestApplicationMasterService {
@@ -71,13 +81,160 @@ public class TestApplicationMasterService {
private final int GB = 1024;
private static YarnConfiguration conf;
- @BeforeClass
- public static void setup() {
+ private static AtomicInteger beforeRegCount = new AtomicInteger(0);
+ private static AtomicInteger afterRegCount = new AtomicInteger(0);
+ private static AtomicInteger beforeAllocCount = new AtomicInteger(0);
+ private static AtomicInteger afterAllocCount = new AtomicInteger(0);
+ private static AtomicInteger beforeFinishCount = new AtomicInteger(0);
+ private static AtomicInteger afterFinishCount = new AtomicInteger(0);
+ private static AtomicInteger initCount = new AtomicInteger(0);
+
+ static class TestInterceptor1 implements
+ ApplicationMasterServiceProcessor {
+
+ private ApplicationMasterServiceProcessor nextProcessor;
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor next) {
+ initCount.incrementAndGet();
+ this.nextProcessor = next;
+ }
+
+ @Override
+ public void registerApplicationMaster(ApplicationAttemptId
+ applicationAttemptId, RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response) throws IOException {
+ nextProcessor.registerApplicationMaster(
+ applicationAttemptId, request, response);
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request,
+ AllocateResponse response) throws YarnException {
+ beforeAllocCount.incrementAndGet();
+ nextProcessor.allocate(appAttemptId, request, response);
+ afterAllocCount.incrementAndGet();
+ }
+
+ @Override
+ public void finishApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ beforeFinishCount.incrementAndGet();
+ afterFinishCount.incrementAndGet();
+ }
+ }
+
+ static class TestInterceptor2 implements
+ ApplicationMasterServiceProcessor {
+
+ private ApplicationMasterServiceProcessor nextProcessor;
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor next) {
+ initCount.incrementAndGet();
+ this.nextProcessor = next;
+ }
+
+ @Override
+ public void registerApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response) throws IOException {
+ beforeRegCount.incrementAndGet();
+ nextProcessor.registerApplicationMaster(applicationAttemptId,
+ request, response);
+ afterRegCount.incrementAndGet();
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response)
+ throws YarnException {
+ beforeAllocCount.incrementAndGet();
+ nextProcessor.allocate(appAttemptId, request, response);
+ afterAllocCount.incrementAndGet();
+ }
+
+ @Override
+ public void finishApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ beforeFinishCount.incrementAndGet();
+ nextProcessor.finishApplicationMaster(
+ applicationAttemptId, request, response);
+ afterFinishCount.incrementAndGet();
+ }
+ }
+
+ @Before
+ public void setup() {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
}
+ @Test(timeout = 300000)
+ public void testApplicationMasterInterceptor() throws Exception {
+ conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+ TestInterceptor1.class.getName() + ","
+ + TestInterceptor2.class.getName());
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+ // Submit an application
+ RMApp app1 = rm.submitApp(2048);
+
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ int allocCount = 0;
+
+ am1.addRequests(new String[] {"127.0.0.1"}, GB, 1, 1);
+ AllocateResponse alloc1Response = am1.schedule(); // send the request
+ allocCount++;
+
+ // kick the scheduler
+ nm1.nodeHeartbeat(true);
+ while (alloc1Response.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ alloc1Response = am1.schedule();
+ allocCount++;
+ }
+
+ // assert RMIdentifer is set properly in allocated containers
+ Container allocatedContainer =
+ alloc1Response.getAllocatedContainers().get(0);
+ ContainerTokenIdentifier tokenId =
+ BuilderUtils.newContainerTokenIdentifier(allocatedContainer
+ .getContainerToken());
+ am1.unregisterAppAttempt();
+
+ Assert.assertEquals(1, beforeRegCount.get());
+ Assert.assertEquals(1, afterRegCount.get());
+
+ // The allocate calls should be incremented twice
+ Assert.assertEquals(allocCount * 2, beforeAllocCount.get());
+ Assert.assertEquals(allocCount * 2, afterAllocCount.get());
+
+ // Finish should only be called once, since the FirstInterceptor
+ // does not forward the call.
+ Assert.assertEquals(1, beforeFinishCount.get());
+ Assert.assertEquals(1, afterFinishCount.get());
+ rm.stop();
+ }
+
@Test(timeout = 3000000)
public void testRMIdentifierOnContainerAllocation() throws Exception {
MockRM rm = new MockRM(conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org