You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/28 19:36:07 UTC

[39/48] hadoop git commit: YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola

YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55ae1439
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55ae1439
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55ae1439

Branch: refs/heads/HDFS-7240
Commit: 55ae1439233e8585d624b2872e1e4753ef63eebb
Parents: 49ff54c
Author: Jian He <ji...@apache.org>
Authored: Sun Mar 27 20:22:12 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Sun Mar 27 20:22:12 2016 -0700

----------------------------------------------------------------------
 .../nodemanager/amrmproxy/AMRMProxyService.java | 14 +++-
 .../amrmproxy/DefaultRequestInterceptor.java    |  7 ++
 .../containermanager/ContainerManagerImpl.java  | 49 ++++++++-----
 .../hadoop/yarn/server/MiniYARNCluster.java     | 75 +++++++++++++++++++-
 4 files changed, 122 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index bd6538c..038c697 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements
     return null;
   }
 
+  @Private
+  public InetSocketAddress getBindAddress() {
+    return this.listenerEndpoint;
+  }
+
+  @Private
+  public AMRMProxyTokenSecretManager getSecretManager() {
+    return this.secretManager;
+  }
+
   /**
    * Private class for handling application stop events.
    *
@@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements
    * ApplicationAttemptId instances.
    *
    */
-  private static class RequestInterceptorChainWrapper {
+  @Private
+  public static class RequestInterceptorChainWrapper {
     private RequestInterceptor rootInterceptor;
     private ApplicationAttemptId applicationAttemptId;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 2c7939b..4457dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Extends the AbstractRequestInterceptor class and provides an implementation
  * that simply forwards the AM requests to the cluster resource manager.
@@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends
     user.addToken(amrmToken);
     amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
   }
+
+  @VisibleForTesting
+  public void setRMClient(ApplicationMasterProtocol rmClient) {
+    this.rmClient = rmClient;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 94d5c1e..8d09aa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private AMRMProxyService amrmProxyService;
-  private boolean amrmProxyEnabled = false;
+  protected boolean amrmProxyEnabled = false;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(sharedCacheUploader);
     dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
 
-    amrmProxyEnabled =
-        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
-            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
-
-    if (amrmProxyEnabled) {
-      LOG.info("AMRMProxyService is enabled. "
-          + "All the AM->RM requests will be intercepted by the proxy");
-      this.amrmProxyService =
-          new AMRMProxyService(this.context, this.dispatcher);
-      addService(this.amrmProxyService);
-    } else {
-      LOG.info("AMRMProxyService is disabled");
-    }
+    createAMRMProxyService(conf);
 
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
@@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements
     recover();
   }
 
-  public boolean isARMRMProxyEnabled() {
-    return amrmProxyEnabled;
+  protected void createAMRMProxyService(Configuration conf) {
+    this.amrmProxyEnabled =
+        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+    if (amrmProxyEnabled) {
+      LOG.info("AMRMProxyService is enabled. "
+          + "All the AM->RM requests will be intercepted by the proxy");
+      this.setAMRMProxyService(
+          new AMRMProxyService(this.context, this.dispatcher));
+      addService(this.getAMRMProxyService());
+    } else {
+      LOG.info("AMRMProxyService is disabled");
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -810,9 +810,9 @@ public class ContainerManagerImpl extends CompositeService implements
 
           // Initialize the AMRMProxy service instance only if the container is of
           // type AM and if the AMRMProxy service is enabled
-          if (isARMRMProxyEnabled() && containerTokenIdentifier
-              .getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
-            this.amrmProxyService.processApplicationStartRequest(request);
+          if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
+              .equals(ContainerType.APPLICATION_MASTER)) {
+            this.getAMRMProxyService().processApplicationStartRequest(request);
           }
 
           startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
@@ -1413,4 +1413,15 @@ public class ContainerManagerImpl extends CompositeService implements
   public Map<String, ByteBuffer> getAuxServiceMetaData() {
     return this.auxiliaryServices.getMetaData();
   }
+
+  @Private
+  public AMRMProxyService getAMRMProxyService() {
+    return this.amrmProxyService;
+  }
+
+  @Private
+  protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
+    this.amrmProxyService = amrmProxyService;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55ae1439/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 74b7732..c933736 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -698,6 +707,15 @@ public class MiniYARNCluster extends CompositeService {
         protected void stopRMProxy() { }
       };
     }
+
+    @Override
+    protected ContainerManagerImpl createContainerManager(Context context,
+        ContainerExecutor exec, DeletionService del,
+        NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
+      return new CustomContainerManagerImpl(context, exec, del,
+          nodeStatusUpdater, metrics, dirsHandler);
+    }
   }
 
   /**
@@ -799,4 +817,55 @@ public class MiniYARNCluster extends CompositeService {
   public int getNumOfResourceManager() {
     return this.resourceManagers.length;
   }
+
+  private class CustomContainerManagerImpl extends ContainerManagerImpl {
+
+    public CustomContainerManagerImpl(Context context, ContainerExecutor exec,
+        DeletionService del, NodeStatusUpdater nodeStatusUpdater,
+        NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+      super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
+    }
+
+    @Override
+    protected void createAMRMProxyService(Configuration conf) {
+      this.amrmProxyEnabled =
+          conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+      if (this.amrmProxyEnabled) {
+        LOG.info("CustomAMRMProxyService is enabled. "
+            + "All the AM->RM requests will be intercepted by the proxy");
+        AMRMProxyService amrmProxyService =
+            useRpc ? new AMRMProxyService(getContext(), dispatcher)
+                : new ShortCircuitedAMRMProxy(getContext(), dispatcher);
+        this.setAMRMProxyService(amrmProxyService);
+        addService(this.getAMRMProxyService());
+      } else {
+        LOG.info("CustomAMRMProxyService is disabled");
+      }
+    }
+  }
+
+  private class ShortCircuitedAMRMProxy extends AMRMProxyService {
+
+    public ShortCircuitedAMRMProxy(Context context,
+        AsyncDispatcher dispatcher) {
+      super(context, dispatcher);
+    }
+
+    @Override
+    protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
+        String user, Token<AMRMTokenIdentifier> amrmToken,
+        Token<AMRMTokenIdentifier> localToken) {
+      super.initializePipeline(applicationAttemptId, user, amrmToken,
+          localToken);
+      RequestInterceptor rt = getPipelines()
+          .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
+      if (rt instanceof DefaultRequestInterceptor) {
+        ((DefaultRequestInterceptor) rt)
+            .setRMClient(getResourceManager().getApplicationMasterService());
+      }
+    }
+
+  }
 }