You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/08 15:36:17 UTC
svn commit: r1556552 [2/2] - in
/hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-api/src/main/proto/server/ hadoop-yarn/had...
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1556552&r1=1556551&r2=1556552&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Wed Jan 8 14:36:09 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -98,6 +99,7 @@ public class MiniYARNCluster extends Com
private boolean useFixedPorts;
private boolean useRpc = false;
+ private int failoverTimeout;
private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
@@ -189,12 +191,15 @@ public class MiniYARNCluster extends Com
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+ failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
if (useRpc && !useFixedPorts) {
throw new YarnRuntimeException("Invalid configuration!" +
" Minicluster can use rpc only when configured to use fixed ports");
}
+ conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
if (resourceManagers.length > 1) {
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
@@ -218,6 +223,13 @@ public class MiniYARNCluster extends Com
// Don't try to login using keytab in the testcases.
}
};
+ if (!useFixedPorts) {
+ if (HAUtil.isHAEnabled(conf)) {
+ setHARMConfiguration(i, conf);
+ } else {
+ setNonHARMConfiguration(conf);
+ }
+ }
addService(new ResourceManagerWrapper(i));
}
for(int index = 0; index < nodeManagers.length; index++) {
@@ -230,18 +242,103 @@ public class MiniYARNCluster extends Com
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
}
+ private void setNonHARMConfiguration(Configuration conf) {
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
+ WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+ }
+
+ private void setHARMConfiguration(final int index, Configuration conf) {
+ String hostname = MiniYARNCluster.getHostname();
+ for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
+ conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0");
+ }
+ }
+
+ private synchronized void initResourceManager(int index, Configuration conf) {
+ if (HAUtil.isHAEnabled(conf)) {
+ conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+ }
+ resourceManagers[index].init(conf);
+ resourceManagers[index].getRMContext().getDispatcher().register(
+ RMAppAttemptEventType.class,
+ new EventHandler<RMAppAttemptEvent>() {
+ public void handle(RMAppAttemptEvent event) {
+ if (event instanceof RMAppAttemptRegistrationEvent) {
+ appMasters.put(event.getApplicationAttemptId(),
+ event.getTimestamp());
+ } else if (event instanceof RMAppAttemptUnregistrationEvent) {
+ appMasters.remove(event.getApplicationAttemptId());
+ }
+ }
+ });
+ }
+
+ private synchronized void startResourceManager(final int index) {
+ try {
+ Thread rmThread = new Thread() {
+ public void run() {
+ resourceManagers[index].start();
+ }
+ };
+ rmThread.setName("RM-" + index);
+ rmThread.start();
+ int waitCount = 0;
+ while (resourceManagers[index].getServiceState() == STATE.INITED
+ && waitCount++ < 60) {
+ LOG.info("Waiting for RM to start...");
+ Thread.sleep(1500);
+ }
+ if (resourceManagers[index].getServiceState() != STATE.STARTED) {
+ // RM could have failed.
+ throw new IOException(
+ "ResourceManager failed to start. Final state is "
+ + resourceManagers[index].getServiceState());
+ }
+ } catch (Throwable t) {
+ throw new YarnRuntimeException(t);
+ }
+ LOG.info("MiniYARN ResourceManager address: " +
+ getConfig().get(YarnConfiguration.RM_ADDRESS));
+ LOG.info("MiniYARN ResourceManager web address: " +
+ WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public synchronized void stopResourceManager(int index) {
+ if (resourceManagers[index] != null) {
+ resourceManagers[index].stop();
+ resourceManagers[index] = null;
+ }
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public synchronized void restartResourceManager(int index)
+ throws InterruptedException {
+ if (resourceManagers[index] != null) {
+ resourceManagers[index].stop();
+ resourceManagers[index] = null;
+ }
+ Configuration conf = getConfig();
+ resourceManagers[index] = new ResourceManager();
+ initResourceManager(index, getConfig());
+ startResourceManager(index);
+ }
+
public File getTestWorkDir() {
return testWorkDir;
}
/**
- * In a HA cluster, go through all the RMs and find the Active RM. If none
- * of them are active, wait upto 5 seconds for them to transition to Active.
- *
- * In an non-HA cluster, return the index of the only RM.
+ * In a HA cluster, go through all the RMs and find the Active RM. In a
+ * non-HA cluster, return the index of the only RM.
*
- * @return index of the active RM or -1 if none of them transition to
- * active even after 5 seconds of waiting
+ * @return index of the active RM or -1 if none of them turn active
*/
@InterfaceAudience.Private
@VisibleForTesting
@@ -250,9 +347,12 @@ public class MiniYARNCluster extends Com
return 0;
}
- int numRetriesForRMBecomingActive = 5;
+ int numRetriesForRMBecomingActive = failoverTimeout / 100;
while (numRetriesForRMBecomingActive-- > 0) {
for (int i = 0; i < resourceManagers.length; i++) {
+ if (resourceManagers[i] == null) {
+ continue;
+ }
try {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
resourceManagers[i].getRMContext().getRMAdminService()
@@ -265,7 +365,7 @@ public class MiniYARNCluster extends Com
}
}
try {
- Thread.sleep(1000);
+ Thread.sleep(100);
} catch (InterruptedException e) {
throw new YarnRuntimeException("Interrupted while waiting for one " +
"of the ResourceManagers to become active");
@@ -282,7 +382,7 @@ public class MiniYARNCluster extends Com
int activeRMIndex = getActiveRMIndex();
return activeRMIndex == -1
? null
- : this.resourceManagers[getActiveRMIndex()];
+ : this.resourceManagers[activeRMIndex];
}
public ResourceManager getResourceManager(int i) {
@@ -310,82 +410,21 @@ public class MiniYARNCluster extends Com
index = i;
}
- private void setNonHARMConfiguration(Configuration conf) {
- String hostname = MiniYARNCluster.getHostname();
- conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
- WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
- }
-
- private void setHARMConfiguration(Configuration conf) {
- String hostname = MiniYARNCluster.getHostname();
- for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
- for (String id : HAUtil.getRMHAIds(conf)) {
- conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
- }
- }
- }
-
@Override
protected synchronized void serviceInit(Configuration conf)
throws Exception {
- conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
-
- if (!useFixedPorts) {
- if (HAUtil.isHAEnabled(conf)) {
- setHARMConfiguration(conf);
- } else {
- setNonHARMConfiguration(conf);
- }
- }
- if (HAUtil.isHAEnabled(conf)) {
- conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
- }
- resourceManagers[index].init(conf);
- resourceManagers[index].getRMContext().getDispatcher().register
- (RMAppAttemptEventType.class,
- new EventHandler<RMAppAttemptEvent>() {
- public void handle(RMAppAttemptEvent event) {
- if (event instanceof RMAppAttemptRegistrationEvent) {
- appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
- } else if (event instanceof RMAppAttemptUnregistrationEvent) {
- appMasters.remove(event.getApplicationAttemptId());
- }
- }
- });
+ initResourceManager(index, conf);
super.serviceInit(conf);
}
@Override
protected synchronized void serviceStart() throws Exception {
- try {
- new Thread() {
- public void run() {
- resourceManagers[index].start();
- }
- }.start();
- int waitCount = 0;
- while (resourceManagers[index].getServiceState() == STATE.INITED
- && waitCount++ < 60) {
- LOG.info("Waiting for RM to start...");
- Thread.sleep(1500);
- }
- if (resourceManagers[index].getServiceState() != STATE.STARTED) {
- // RM could have failed.
- throw new IOException(
- "ResourceManager failed to start. Final state is "
- + resourceManagers[index].getServiceState());
- }
- super.serviceStart();
- } catch (Throwable t) {
- throw new YarnRuntimeException(t);
- }
+ startResourceManager(index);
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
+ super.serviceStart();
}
private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
@@ -406,7 +445,6 @@ public class MiniYARNCluster extends Com
waitForAppMastersToFinish(5000);
resourceManagers[index].stop();
}
- super.serviceStop();
if (Shell.WINDOWS) {
// On Windows, clean up the short temporary symlink that was created to
@@ -420,6 +458,7 @@ public class MiniYARNCluster extends Com
testWorkDir.getAbsolutePath());
}
}
+ super.serviceStop();
}
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1556552&r1=1556551&r2=1556552&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Wed Jan 8 14:36:09 2014
@@ -19,21 +19,20 @@
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
-import java.net.InetSocketAddress;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
/**
* This class abstracts away how ApplicationReports are fetched.
@@ -50,16 +49,12 @@ public class AppReportFetcher {
*/
public AppReportFetcher(Configuration conf) {
this.conf = conf;
- YarnRPC rpc = YarnRPC.create(this.conf);
- InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- applicationsManager =
- (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
- rmAddress, this.conf);
- LOG.info("Connected to ResourceManager at " + rmAddress);
+ try {
+ applicationsManager = ClientRMProxy.createRMProxy(conf,
+ ApplicationClientProtocol.class);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
}
/**
@@ -91,4 +86,10 @@ public class AppReportFetcher {
.getApplicationReport(request);
return response.getApplicationReport();
}
+
+ public void stop() {
+ if (this.applicationsManager != null) {
+ RPC.stopProxy(this.applicationsManager);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1556552&r1=1556551&r2=1556552&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java Wed Jan 8 14:36:09 2014
@@ -117,6 +117,9 @@ public class WebAppProxy extends Abstrac
throw new YarnRuntimeException("Error stopping proxy web server",e);
}
}
+ if(this.fetcher != null) {
+ this.fetcher.stop();
+ }
super.serviceStop();
}