You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/12/15 10:01:26 UTC
svn commit: r1214664 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/t...
Author: mahadev
Date: Thu Dec 15 09:01:25 2011
New Revision: 1214664
URL: http://svn.apache.org/viewvc?rev=1214664&view=rev
Log:
MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster. (Anupam Seth via mahadev) - Merging r1214662 from trunk.
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Dec 15 09:01:25 2011
@@ -11,6 +11,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
Graves via vinodkv)
+ MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster.
+ (Anupam Seth via mahadev)
+
IMPROVEMENTS
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
(Vinay Thota via amarrk)
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Dec 15 09:01:25 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.YarnExcept
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -156,30 +157,37 @@ public class ClientServiceDelegate {
application = rm.getApplicationReport(appId);
continue;
}
- UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
- UserGroupInformation.getCurrentUser().getUserName());
- serviceAddr = application.getHost() + ":" + application.getRpcPort();
- if (UserGroupInformation.isSecurityEnabled()) {
- String clientTokenEncoded = application.getClientToken();
- Token<ApplicationTokenIdentifier> clientToken =
- new Token<ApplicationTokenIdentifier>();
- clientToken.decodeFromUrlString(clientTokenEncoded);
- // RPC layer client expects ip:port as service for tokens
- InetSocketAddress addr = NetUtils.createSocketAddr(application
- .getHost(), application.getRpcPort());
- clientToken.setService(new Text(addr.getAddress().getHostAddress()
- + ":" + addr.getPort()));
- newUgi.addToken(clientToken);
- }
- LOG.info("The url to track the job: " + application.getTrackingUrl());
- LOG.debug("Connecting to " + serviceAddr);
- final String tempStr = serviceAddr;
- realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
- @Override
- public MRClientProtocol run() throws IOException {
- return instantiateAMProxy(tempStr);
+ if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) {
+ UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+ UserGroupInformation.getCurrentUser().getUserName());
+ serviceAddr = application.getHost() + ":" + application.getRpcPort();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String clientTokenEncoded = application.getClientToken();
+ Token<ApplicationTokenIdentifier> clientToken =
+ new Token<ApplicationTokenIdentifier>();
+ clientToken.decodeFromUrlString(clientTokenEncoded);
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr = NetUtils.createSocketAddr(application
+ .getHost(), application.getRpcPort());
+ clientToken.setService(new Text(addr.getAddress().getHostAddress()
+ + ":" + addr.getPort()));
+ newUgi.addToken(clientToken);
}
- });
+ LOG.info("The url to track the job: " + application.getTrackingUrl());
+ LOG.debug("Connecting to " + serviceAddr);
+ final String tempStr = serviceAddr;
+ realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() throws IOException {
+ return instantiateAMProxy(tempStr);
+ }
+ });
+ } else {
+ logApplicationReportInfo(application);
+ LOG.info("Network ACL closed to AM for job " + jobId
+ + ". Redirecting to job history server.");
+ return checkAndGetHSProxy(null, JobState.RUNNING);
+ }
return realProxy;
} catch (IOException e) {
//possibly the AM has crashed
@@ -240,10 +248,55 @@ public class ClientServiceDelegate {
return realProxy;
}
+ private void logApplicationReportInfo(ApplicationReport application) {
+ if(application == null) {
+ return;
+ }
+ LOG.info("AppId: " + application.getApplicationId()
+ + " # reserved containers: "
+ + application.getApplicationResourceUsageReport().getNumReservedContainers()
+ + " # used containers: "
+ + application.getApplicationResourceUsageReport().getNumUsedContainers()
+ + " Needed resources (memory): "
+ + application.getApplicationResourceUsageReport().getNeededResources().getMemory()
+ + " Reserved resources (memory): "
+ + application.getApplicationResourceUsageReport().getReservedResources().getMemory()
+ + " Used resources (memory): "
+ + application.getApplicationResourceUsageReport().getUsedResources().getMemory()
+ + " Diagnostics: "
+ + application.getDiagnostics()
+ + " Start time: "
+ + application.getStartTime()
+ + " Finish time: "
+ + application.getFinishTime()
+ + " Host: "
+ + application.getHost()
+ + " Name: "
+ + application.getName()
+ + " Orig. tracking url: "
+ + application.getOriginalTrackingUrl()
+ + " Queue: "
+ + application.getQueue()
+ + " RPC port: "
+ + application.getRpcPort()
+ + " Tracking url: "
+ + application.getTrackingUrl()
+ + " User: "
+ + application.getUser()
+ + " Client token: "
+ + application.getClientToken()
+ + " Final appl. status: "
+ + application.getFinalApplicationStatus()
+ + " Yarn appl. state: "
+ + application.getYarnApplicationState()
+ );
+ }
+
private MRClientProtocol checkAndGetHSProxy(
ApplicationReport applicationReport, JobState state) {
if (null == historyServerProxy) {
- LOG.warn("Job History Server is not configured.");
+ LOG.warn("Job History Server is not configured or " +
+ "job information not yet available on History Server.");
return getNotRunningJob(applicationReport, state);
}
return historyServerProxy;
@@ -452,4 +505,4 @@ public class ClientServiceDelegate {
throw new IOException("Cannot get log path for a in-progress job");
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Dec 15 09:01:25 2011
@@ -22,6 +22,8 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import junit.framework.Assert;
@@ -31,8 +33,13 @@ import org.apache.hadoop.mapreduce.JobSt
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -45,15 +52,30 @@ import org.apache.hadoop.yarn.ipc.RPCUti
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* Tests for ClientServiceDelegate.java
*/
+@RunWith(value = Parameterized.class)
public class TestClientServiceDelegate {
private JobID oldJobId = JobID.forName("job_1315895242400_2");
private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
.toYarn(oldJobId);
+ private boolean isAMReachableFromClient;
+
+ public TestClientServiceDelegate(boolean isAMReachableFromClient) {
+ this.isAMReachableFromClient = isAMReachableFromClient;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] { { true }, { false } };
+ return Arrays.asList(data);
+ }
@Test
public void testUnknownAppInRM() throws Exception {
@@ -150,9 +172,30 @@ public class TestClientServiceDelegate {
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
}
+
+ @Test
+ public void testCountersFromHistoryServer() throws Exception {
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getCounters(getCountersRequest())).thenReturn(
+ getCountersResponseFromHistoryServer());
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+ .thenReturn(null);
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ historyServerProxy, rm);
+
+ Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId));
+ Assert.assertNotNull(counters);
+ Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue());
+ }
@Test
public void testReconnectOnAMRestart() throws IOException {
+ //test not applicable when AM not reachable
+ //as instantiateAMProxy is not called at all
+ if(!isAMReachableFromClient) {
+ return;
+ }
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
@@ -186,7 +229,7 @@ public class TestClientServiceDelegate {
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(jobReportResponse2);
-
+
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
historyServerProxy, rmDelegate));
// First time, connection should be to AM1, then to AM2. Further requests
@@ -210,13 +253,13 @@ public class TestClientServiceDelegate {
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
any(String.class));
}
-
+
private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId);
return request;
}
-
+
private GetJobReportResponse getJobReportResponse() {
GetJobReportResponse jobReportResponse = Records
.newRecord(GetJobReportResponse.class);
@@ -227,6 +270,12 @@ public class TestClientServiceDelegate {
return jobReportResponse;
}
+ private GetCountersRequest getCountersRequest() {
+ GetCountersRequest request = Records.newRecord(GetCountersRequest.class);
+ request.setJobId(jobId);
+ return request;
+ }
+
private ApplicationReport getFinishedApplicationReport() {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", "host", 124, null,
@@ -251,6 +300,7 @@ public class TestClientServiceDelegate {
MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rm, oldJobId, historyServerProxy);
return clientServiceDelegate;
@@ -269,4 +319,21 @@ public class TestClientServiceDelegate {
jobReportResponse.setJobReport(jobReport);
return jobReportResponse;
}
+
+ private GetCountersResponse getCountersResponseFromHistoryServer() {
+ GetCountersResponse countersResponse = Records
+ .newRecord(GetCountersResponse.class);
+ Counter counter = Records.newRecord(Counter.class);
+ CounterGroup counterGroup = Records.newRecord(CounterGroup.class);
+ Counters counters = Records.newRecord(Counters.class);
+ counter.setDisplayName("dummyCounter");
+ counter.setName("dummyCounter");
+ counter.setValue(1001);
+ counterGroup.setName("dummyCounters");
+ counterGroup.setDisplayName("dummyCounters");
+ counterGroup.setCounter("dummyCounter", counter);
+ counters.setCounterGroup("dummyCounters", counterGroup);
+ countersResponse.setCounters(counters);
+ return countersResponse;
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Dec 15 09:01:25 2011
@@ -145,6 +145,11 @@ public class YarnConfiguration extends C
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
+ /** RM-AM ACL disabled. **/
+ public static final String RM_AM_NETWORK_ACL_CLOSED =
+ RM_PREFIX + "am.acl.disabled";
+ public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false;
+
/** The address of the RM admin interface.*/
public static final String RM_ADMIN_ADDRESS =
RM_PREFIX + "admin.address";
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1214664&r1=1214663&r2=1214664&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Thu Dec 15 09:01:25 2011
@@ -118,6 +118,12 @@
</property>
<property>
+ <description>Network ACL to AM closed.</description>
+ <name>yarn.resourcemanager.am.acl.disabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
<description>The address of the RM admin interface.</description>
<name>yarn.resourcemanager.admin.address</name>
<value>0.0.0.0:8141</value>