You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/28 19:36:07 UTC
[39/48] hadoop git commit: YARN-4117. End to end unit test with mini
YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola
YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55ae1439
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55ae1439
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55ae1439
Branch: refs/heads/HDFS-7240
Commit: 55ae1439233e8585d624b2872e1e4753ef63eebb
Parents: 49ff54c
Author: Jian He <ji...@apache.org>
Authored: Sun Mar 27 20:22:12 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Sun Mar 27 20:22:12 2016 -0700
----------------------------------------------------------------------
.../nodemanager/amrmproxy/AMRMProxyService.java | 14 +++-
.../amrmproxy/DefaultRequestInterceptor.java | 7 ++
.../containermanager/ContainerManagerImpl.java | 49 ++++++++-----
.../hadoop/yarn/server/MiniYARNCluster.java | 75 +++++++++++++++++++-
4 files changed, 122 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index bd6538c..038c697 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements
return null;
}
+ @Private
+ public InetSocketAddress getBindAddress() {
+ return this.listenerEndpoint;
+ }
+
+ @Private
+ public AMRMProxyTokenSecretManager getSecretManager() {
+ return this.secretManager;
+ }
+
/**
* Private class for handling application stop events.
*
@@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements
* ApplicationAttemptId instances.
*
*/
- private static class RequestInterceptorChainWrapper {
+ @Private
+ public static class RequestInterceptorChainWrapper {
private RequestInterceptor rootInterceptor;
private ApplicationAttemptId applicationAttemptId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 2c7939b..4457dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Extends the AbstractRequestInterceptor class and provides an implementation
* that simply forwards the AM requests to the cluster resource manager.
@@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends
user.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
}
+
+ @VisibleForTesting
+ public void setRMClient(ApplicationMasterProtocol rmClient) {
+ this.rmClient = rmClient;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 94d5c1e..8d09aa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements
private final ReadLock readLock;
private final WriteLock writeLock;
private AMRMProxyService amrmProxyService;
- private boolean amrmProxyEnabled = false;
+ protected boolean amrmProxyEnabled = false;
private long waitForContainersOnShutdownMillis;
@@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
- amrmProxyEnabled =
- conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
-
- if (amrmProxyEnabled) {
- LOG.info("AMRMProxyService is enabled. "
- + "All the AM->RM requests will be intercepted by the proxy");
- this.amrmProxyService =
- new AMRMProxyService(this.context, this.dispatcher);
- addService(this.amrmProxyService);
- } else {
- LOG.info("AMRMProxyService is disabled");
- }
+ createAMRMProxyService(conf);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
@@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements
recover();
}
- public boolean isARMRMProxyEnabled() {
- return amrmProxyEnabled;
+ protected void createAMRMProxyService(Configuration conf) {
+ this.amrmProxyEnabled =
+ conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+ if (amrmProxyEnabled) {
+ LOG.info("AMRMProxyService is enabled. "
+ + "All the AM->RM requests will be intercepted by the proxy");
+ this.setAMRMProxyService(
+ new AMRMProxyService(this.context, this.dispatcher));
+ addService(this.getAMRMProxyService());
+ } else {
+ LOG.info("AMRMProxyService is disabled");
+ }
}
@SuppressWarnings("unchecked")
@@ -810,9 +810,9 @@ public class ContainerManagerImpl extends CompositeService implements
// Initialize the AMRMProxy service instance only if the container is of
// type AM and if the AMRMProxy service is enabled
- if (isARMRMProxyEnabled() && containerTokenIdentifier
- .getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
- this.amrmProxyService.processApplicationStartRequest(request);
+ if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
+ .equals(ContainerType.APPLICATION_MASTER)) {
+ this.getAMRMProxyService().processApplicationStartRequest(request);
}
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
@@ -1413,4 +1413,15 @@ public class ContainerManagerImpl extends CompositeService implements
public Map<String, ByteBuffer> getAuxServiceMetaData() {
return this.auxiliaryServices.getMetaData();
}
+
+ @Private
+ public AMRMProxyService getAMRMProxyService() {
+ return this.amrmProxyService;
+ }
+
+ @Private
+ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
+ this.amrmProxyService = amrmProxyService;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 74b7732..c933736 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -698,6 +707,15 @@ public class MiniYARNCluster extends CompositeService {
protected void stopRMProxy() { }
};
}
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
+ return new CustomContainerManagerImpl(context, exec, del,
+ nodeStatusUpdater, metrics, dirsHandler);
+ }
}
/**
@@ -799,4 +817,55 @@ public class MiniYARNCluster extends CompositeService {
public int getNumOfResourceManager() {
return this.resourceManagers.length;
}
+
+ private class CustomContainerManagerImpl extends ContainerManagerImpl {
+
+ public CustomContainerManagerImpl(Context context, ContainerExecutor exec,
+ DeletionService del, NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+ super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
+ }
+
+ @Override
+ protected void createAMRMProxyService(Configuration conf) {
+ this.amrmProxyEnabled =
+ conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+ if (this.amrmProxyEnabled) {
+ LOG.info("CustomAMRMProxyService is enabled. "
+ + "All the AM->RM requests will be intercepted by the proxy");
+ AMRMProxyService amrmProxyService =
+ useRpc ? new AMRMProxyService(getContext(), dispatcher)
+ : new ShortCircuitedAMRMProxy(getContext(), dispatcher);
+ this.setAMRMProxyService(amrmProxyService);
+ addService(this.getAMRMProxyService());
+ } else {
+ LOG.info("CustomAMRMProxyService is disabled");
+ }
+ }
+ }
+
+ private class ShortCircuitedAMRMProxy extends AMRMProxyService {
+
+ public ShortCircuitedAMRMProxy(Context context,
+ AsyncDispatcher dispatcher) {
+ super(context, dispatcher);
+ }
+
+ @Override
+ protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
+ String user, Token<AMRMTokenIdentifier> amrmToken,
+ Token<AMRMTokenIdentifier> localToken) {
+ super.initializePipeline(applicationAttemptId, user, amrmToken,
+ localToken);
+ RequestInterceptor rt = getPipelines()
+ .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
+ if (rt instanceof DefaultRequestInterceptor) {
+ ((DefaultRequestInterceptor) rt)
+ .setRMClient(getResourceManager().getApplicationMasterService());
+ }
+ }
+
+ }
}