You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/07 23:16:30 UTC
svn commit: r1556380 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-commo...
Author: vinodkv
Date: Tue Jan 7 22:16:30 2014
New Revision: 1556380
URL: http://svn.apache.org/r1556380
Log:
YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager fail-over. Contributed by Xuan Gong.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jan 7 22:16:30 2014
@@ -200,6 +200,9 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
+ YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
+ fail-over. (Xuan Gong via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java Tue Jan 7 22:16:30 2014
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,13 +33,18 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +62,8 @@ public class TestRMFailover extends Clie
private Configuration conf;
private MiniYARNCluster cluster;
+ private ApplicationId fakeAppId;
+
private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value);
@@ -77,6 +86,7 @@ public class TestRMFailover extends Clie
@Before
public void setup() throws IOException {
+ fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@@ -179,4 +189,67 @@ public class TestRMFailover extends Clie
failover();
verifyConnections();
}
+
+ @Test
+ public void testWebAppProxyInStandAloneMode() throws YarnException,
+ InterruptedException, IOException {
+ WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
+ try {
+ conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+ webAppProxyServer.init(conf);
+
+ // Start webAppProxyServer
+ Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
+ webAppProxyServer.start();
+ Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
+
+ URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
+ HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+ .openConnection();
+
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+
+ explicitFailover();
+ verifyConnections();
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+ } finally {
+ webAppProxyServer.stop();
+ }
+ }
+
+ @Test
+ public void testEmbeddedWebAppProxy() throws YarnException,
+ InterruptedException, IOException {
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+ URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
+ HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+ .openConnection();
+
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+
+ explicitFailover();
+ verifyConnections();
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+ }
+
+ private void verifyExpectedException(String exceptionMessage){
+ assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class
+ .getName()));
+ assertTrue(exceptionMessage
+ .contains("Application with id '" + fakeAppId + "' " +
+ "doesn't exist in RM."));
+ }
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java?rev=1556380&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java Tue Jan 7 22:16:30 2014
@@ -0,0 +1,118 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+
+import com.google.common.base.Preconditions;
+
+public class ClientRMProxy<T> extends RMProxy<T> {
+ private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+ private static final ClientRMProxy INSTANCE = new ClientRMProxy();
+
+ private interface ClientRMProtocols extends ApplicationClientProtocol,
+ ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+ // Add nothing
+ }
+
+ private ClientRMProxy(){
+ super();
+ }
+
+ /**
+ * Create a proxy to the ResourceManager for the specified protocol.
+ * @param configuration Configuration with all the required information.
+ * @param protocol Client protocol for which proxy is being requested.
+ * @param <T> Type of proxy.
+ * @return Proxy to the ResourceManager for the specified client protocol.
+ * @throws IOException
+ */
+ public static <T> T createRMProxy(final Configuration configuration,
+ final Class<T> protocol) throws IOException {
+ return createRMProxy(configuration, protocol, INSTANCE);
+ }
+
+ private static void setupTokens(InetSocketAddress resourceManagerAddress)
+ throws IOException {
+ // It is assumed for now that the only AMRMToken in AM's UGI is for this
+ // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
+ // default service-address, see YARN-986.
+ for (Token<? extends TokenIdentifier> token : UserGroupInformation
+ .getCurrentUser().getTokens()) {
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ // This token needs to be directly provided to the AMs, so set the
+ // appropriate service-name. We'll need more infrastructure when we
+ // need to set it in HA case.
+ SecurityUtil.setTokenService(token, resourceManagerAddress);
+ }
+ }
+ }
+
+ @InterfaceAudience.Private
+ @Override
+ protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+ Class<?> protocol) throws IOException {
+ if (protocol == ApplicationClientProtocol.class) {
+ return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ } else if (protocol == ResourceManagerAdministrationProtocol.class) {
+ return conf.getSocketAddr(
+ YarnConfiguration.RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+ } else if (protocol == ApplicationMasterProtocol.class) {
+ InetSocketAddress serviceAddr =
+ conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ setupTokens(serviceAddr);
+ return serviceAddr;
+ } else {
+ String message = "Unsupported protocol found when creating the proxy " +
+ "connection to ResourceManager: " +
+ ((protocol != null) ? protocol.getClass().getName() : "null");
+ LOG.error(message);
+ throw new IllegalStateException(message);
+ }
+ }
+
+ @InterfaceAudience.Private
+ @Override
+ protected void checkAllowedProtocols(Class<?> protocol) {
+ Preconditions.checkArgument(
+ protocol.isAssignableFrom(ClientRMProtocols.class),
+ "RM does not support this client protocol");
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Jan 7 22:16:30 2014
@@ -147,9 +147,12 @@ public class ResourceManager extends Com
protected QueueACLsManager queueACLsManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
+ private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
+ private String webAppAddress;
+
/** End of Active services */
private Configuration conf;
@@ -194,6 +197,8 @@ public class ResourceManager extends Com
}
createAndInitActiveServices();
+ webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf);
+
super.serviceInit(conf);
}
@@ -437,22 +442,12 @@ public class ResourceManager extends Com
throw e;
}
}
- startWepApp();
-
- if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
- int port = webApp.port();
- WebAppUtils.setRMWebAppPort(conf, port);
- }
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
- if (webApp != null) {
- webApp.stop();
- }
-
DefaultMetricsSystem.shutdown();
@@ -752,12 +747,16 @@ public class ResourceManager extends Com
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
- .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf));
+ .at(webAppAddress);
String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) {
- AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
- builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
+ if (HAUtil.isHAEnabled(conf)) {
+ fetcher = new AppReportFetcher(conf);
+ } else {
+ fetcher = new AppReportFetcher(conf, getClientRMService());
+ }
+ builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
String[] proxyParts = proxyHostAndPort.split(":");
@@ -854,6 +853,11 @@ public class ResourceManager extends Com
transitionToActive();
}
+ startWepApp();
+ if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ int port = webApp.port();
+ WebAppUtils.setRMWebAppPort(conf, port);
+ }
super.serviceStart();
}
@@ -864,6 +868,12 @@ public class ResourceManager extends Com
@Override
protected void serviceStop() throws Exception {
+ if (webApp != null) {
+ webApp.stop();
+ }
+ if (fetcher != null) {
+ fetcher.stop();
+ }
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Tue Jan 7 22:16:30 2014
@@ -19,21 +19,20 @@
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
-import java.net.InetSocketAddress;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
/**
* This class abstracts away how ApplicationReports are fetched.
@@ -50,16 +49,12 @@ public class AppReportFetcher {
*/
public AppReportFetcher(Configuration conf) {
this.conf = conf;
- YarnRPC rpc = YarnRPC.create(this.conf);
- InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- applicationsManager =
- (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
- rmAddress, this.conf);
- LOG.info("Connected to ResourceManager at " + rmAddress);
+ try {
+ applicationsManager = ClientRMProxy.createRMProxy(conf,
+ ApplicationClientProtocol.class);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
}
/**
@@ -91,4 +86,10 @@ public class AppReportFetcher {
.getApplicationReport(request);
return response.getApplicationReport();
}
+
+ public void stop() {
+ if (this.applicationsManager != null) {
+ RPC.stopProxy(this.applicationsManager);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java Tue Jan 7 22:16:30 2014
@@ -117,6 +117,9 @@ public class WebAppProxy extends Abstrac
throw new YarnRuntimeException("Error stopping proxy web server",e);
}
}
+ if(this.fetcher != null) {
+ this.fetcher.stop();
+ }
super.serviceStop();
}