You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/12/15 10:01:26 UTC

svn commit: r1214664 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/t...

Author: mahadev
Date: Thu Dec 15 09:01:25 2011
New Revision: 1214664

URL: http://svn.apache.org/viewvc?rev=1214664&view=rev
Log:
MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster. (Anupam Seth via mahadev) - Merging r1214662 from trunk.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Dec 15 09:01:25 2011
@@ -11,6 +11,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
     Graves via vinodkv)
 
+    MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster.
+    (Anupam Seth via mahadev)
+
   IMPROVEMENTS
     MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
                     (Vinay Thota via amarrk)

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Dec 15 09:01:25 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -156,30 +157,37 @@ public class ClientServiceDelegate {
           application = rm.getApplicationReport(appId);
           continue;
         }
-        UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
-            UserGroupInformation.getCurrentUser().getUserName());
-        serviceAddr = application.getHost() + ":" + application.getRpcPort();
-        if (UserGroupInformation.isSecurityEnabled()) {
-          String clientTokenEncoded = application.getClientToken();
-          Token<ApplicationTokenIdentifier> clientToken =
-            new Token<ApplicationTokenIdentifier>();
-          clientToken.decodeFromUrlString(clientTokenEncoded);
-          // RPC layer client expects ip:port as service for tokens
-          InetSocketAddress addr = NetUtils.createSocketAddr(application
-              .getHost(), application.getRpcPort());
-          clientToken.setService(new Text(addr.getAddress().getHostAddress()
-              + ":" + addr.getPort()));
-          newUgi.addToken(clientToken);
-        }
-        LOG.info("The url to track the job: " + application.getTrackingUrl());
-        LOG.debug("Connecting to " + serviceAddr);
-        final String tempStr = serviceAddr;
-        realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
-          @Override
-          public MRClientProtocol run() throws IOException {
-            return instantiateAMProxy(tempStr);
+        if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) {
+          UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+              UserGroupInformation.getCurrentUser().getUserName());
+          serviceAddr = application.getHost() + ":" + application.getRpcPort();
+          if (UserGroupInformation.isSecurityEnabled()) {
+            String clientTokenEncoded = application.getClientToken();
+            Token<ApplicationTokenIdentifier> clientToken =
+              new Token<ApplicationTokenIdentifier>();
+            clientToken.decodeFromUrlString(clientTokenEncoded);
+            // RPC layer client expects ip:port as service for tokens
+            InetSocketAddress addr = NetUtils.createSocketAddr(application
+                .getHost(), application.getRpcPort());
+            clientToken.setService(new Text(addr.getAddress().getHostAddress()
+                + ":" + addr.getPort()));
+            newUgi.addToken(clientToken);
           }
-        });
+          LOG.info("The url to track the job: " + application.getTrackingUrl());
+          LOG.debug("Connecting to " + serviceAddr);
+          final String tempStr = serviceAddr;
+          realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+            @Override
+            public MRClientProtocol run() throws IOException {
+              return instantiateAMProxy(tempStr);
+            }
+          });
+	} else {
+           logApplicationReportInfo(application); 
+           LOG.info("Network ACL closed to AM for job " + jobId
+             + ". Redirecting to job history server.");
+           return checkAndGetHSProxy(null, JobState.RUNNING);
+        }  
         return realProxy;
       } catch (IOException e) {
         //possibly the AM has crashed
@@ -240,10 +248,55 @@ public class ClientServiceDelegate {
     return realProxy;
   }
 
+  private void logApplicationReportInfo(ApplicationReport application) {
+    if(application == null) {
+      return;
+    }
+    LOG.info("AppId: " + application.getApplicationId()
+      + " # reserved containers: " 
+      + application.getApplicationResourceUsageReport().getNumReservedContainers()
+      + " # used containers: " 
+      + application.getApplicationResourceUsageReport().getNumUsedContainers()
+      + " Needed resources (memory): "
+      + application.getApplicationResourceUsageReport().getNeededResources().getMemory()
+      + " Reserved resources (memory): "
+      + application.getApplicationResourceUsageReport().getReservedResources().getMemory()
+      + " Used resources (memory): "
+      + application.getApplicationResourceUsageReport().getUsedResources().getMemory()
+      + " Diagnostics: " 
+      + application.getDiagnostics()
+      + " Start time: "
+      + application.getStartTime()
+      + " Finish time: "
+      + application.getFinishTime()
+      + " Host: "
+      + application.getHost()
+      + " Name: "
+      + application.getName()
+      + " Orig. tracking url: "
+      + application.getOriginalTrackingUrl()
+      + " Queue: "
+      + application.getQueue()
+      + " RPC port: "
+      + application.getRpcPort()
+      + " Tracking url: "
+      + application.getTrackingUrl()
+      + " User: "
+      + application.getUser()
+      + " Client token: "
+      + application.getClientToken()
+      + " Final appl. status: "
+      + application.getFinalApplicationStatus()
+      + " Yarn appl. state: "
+      + application.getYarnApplicationState()
+    );
+  }
+
   private MRClientProtocol checkAndGetHSProxy(
       ApplicationReport applicationReport, JobState state) {
     if (null == historyServerProxy) {
-      LOG.warn("Job History Server is not configured.");
+      LOG.warn("Job History Server is not configured or " +
+      		"job information not yet available on History Server.");
       return getNotRunningJob(applicationReport, state);
     }
     return historyServerProxy;
@@ -452,4 +505,4 @@ public class ClientServiceDelegate {
       throw new IOException("Cannot get log path for a in-progress job");
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Dec 15 09:01:25 2011
@@ -22,6 +22,8 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
 import junit.framework.Assert;
 
@@ -31,8 +33,13 @@ import org.apache.hadoop.mapreduce.JobSt
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -45,15 +52,30 @@ import org.apache.hadoop.yarn.ipc.RPCUti
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests for ClientServiceDelegate.java
  */
 
+@RunWith(value = Parameterized.class)
 public class TestClientServiceDelegate {
   private JobID oldJobId = JobID.forName("job_1315895242400_2");
   private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
       .toYarn(oldJobId);
+  private boolean isAMReachableFromClient;
+
+  public TestClientServiceDelegate(boolean isAMReachableFromClient) {
+    this.isAMReachableFromClient = isAMReachableFromClient;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { true }, { false } };
+    return Arrays.asList(data);
+  }
 
   @Test
   public void testUnknownAppInRM() throws Exception {
@@ -150,9 +172,30 @@ public class TestClientServiceDelegate {
     Assert.assertEquals(1.0f, jobStatus.getMapProgress());                                        
     Assert.assertEquals(1.0f, jobStatus.getReduceProgress());                                     
   }
+  
+  @Test
+  public void testCountersFromHistoryServer() throws Exception {                                 
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);                           
+    when(historyServerProxy.getCounters(getCountersRequest())).thenReturn(                      
+        getCountersResponseFromHistoryServer());
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);                                     
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))                      
+        .thenReturn(null);                                                                        
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(                       
+        historyServerProxy, rm);
+
+    Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId));
+    Assert.assertNotNull(counters);
+    Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue());                               
+  }
 
   @Test
   public void testReconnectOnAMRestart() throws IOException {
+    //test not applicable when AM not reachable
+    //as instantiateAMProxy is not called at all
+    if(!isAMReachableFromClient) {
+      return;
+    }
 
     MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
 
@@ -186,7 +229,7 @@ public class TestClientServiceDelegate {
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
     when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
         .thenReturn(jobReportResponse2);
-
+    
     ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
         historyServerProxy, rmDelegate));
     // First time, connection should be to AM1, then to AM2. Further requests
@@ -210,13 +253,13 @@ public class TestClientServiceDelegate {
     verify(clientServiceDelegate, times(2)).instantiateAMProxy(
         any(String.class));
   }
-
+  
   private GetJobReportRequest getJobReportRequest() {
     GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
     request.setJobId(jobId);
     return request;
   }
-
+  
   private GetJobReportResponse getJobReportResponse() {
     GetJobReportResponse jobReportResponse = Records
         .newRecord(GetJobReportResponse.class);
@@ -227,6 +270,12 @@ public class TestClientServiceDelegate {
     return jobReportResponse;
   }
 
+  private GetCountersRequest getCountersRequest() {
+    GetCountersRequest request = Records.newRecord(GetCountersRequest.class);
+    request.setJobId(jobId);
+    return request;
+  }
+
   private ApplicationReport getFinishedApplicationReport() {
     return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
         1234, 5), "user", "queue", "appname", "host", 124, null,
@@ -251,6 +300,7 @@ public class TestClientServiceDelegate {
       MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
     Configuration conf = new YarnConfiguration();
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
     ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
         conf, rm, oldJobId, historyServerProxy);
     return clientServiceDelegate;
@@ -269,4 +319,21 @@ public class TestClientServiceDelegate {
     jobReportResponse.setJobReport(jobReport);
     return jobReportResponse;
   }
+  
+  private GetCountersResponse getCountersResponseFromHistoryServer() {
+    GetCountersResponse countersResponse = Records
+        .newRecord(GetCountersResponse.class);
+    Counter counter = Records.newRecord(Counter.class);
+    CounterGroup counterGroup = Records.newRecord(CounterGroup.class);
+    Counters counters = Records.newRecord(Counters.class);
+    counter.setDisplayName("dummyCounter");
+    counter.setName("dummyCounter");
+    counter.setValue(1001);
+    counterGroup.setName("dummyCounters");
+    counterGroup.setDisplayName("dummyCounters");
+    counterGroup.setCounter("dummyCounter", counter);
+    counters.setCounterGroup("dummyCounters", counterGroup);
+    countersResponse.setCounters(counters);
+    return countersResponse;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Dec 15 09:01:25 2011
@@ -145,6 +145,11 @@ public class YarnConfiguration extends C
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
+  /** RM-AM ACL disabled. **/
+  public static final String RM_AM_NETWORK_ACL_CLOSED = 
+    RM_PREFIX + "am.acl.disabled";
+  public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false;
+
   /** The address of the RM admin interface.*/
   public static final String RM_ADMIN_ADDRESS = 
     RM_PREFIX + "admin.address";

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Thu Dec 15 09:01:25 2011
@@ -118,6 +118,12 @@
   </property>
 
   <property>
+    <description>Network ACL to AM closed.</description>
+    <name>yarn.resourcemanager.am.acl.disabled</name>
+    <value>false</value>
+  </property> 
+
+  <property>
     <description>The address of the RM admin interface.</description>
     <name>yarn.resourcemanager.admin.address</name>
     <value>0.0.0.0:8141</value>