You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [3/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Aug 19 23:49:39 2014
@@ -126,6 +126,10 @@ public class YarnConfiguration extends C
public static final String DEFAULT_RM_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT;
+ /** The actual bind address for the RM.*/
+ public static final String RM_BIND_HOST =
+ RM_PREFIX + "bind-host";
+
/** The number of threads used to handle applications manager requests.*/
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
@@ -263,6 +267,17 @@ public class YarnConfiguration extends C
public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
RM_PREFIX + "webapp.spnego-keytab-file";
+ /**
+ * Flag to enable override of the default kerberos authentication filter with
+ * the RM authentication filter to allow authentication using delegation
+ * tokens(fallback to kerberos if the tokens are missing). Only applicable
+ * when the http authentication type is kerberos.
+ */
+ public static final String RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER = RM_PREFIX
+ + "webapp.delegation-token-auth-filter.enabled";
+ public static final boolean DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER =
+ true;
+
/** How long to wait until a container is considered dead.*/
public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
@@ -318,17 +333,24 @@ public class YarnConfiguration extends C
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
+ @Private
+ public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX
+ + "work-preserving-recovery.enabled";
+ @Private
+ public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
+ false;
+
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
public static final String RM_ZK_ADDRESS = RM_ZK_PREFIX + "address";
public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries";
- public static final int DEFAULT_ZK_RM_NUM_RETRIES = 500;
+ public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000;
public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms";
- public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 2000;
+ public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
@@ -527,6 +549,10 @@ public class YarnConfiguration extends C
public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:"
+ DEFAULT_NM_PORT;
+ /** The actual bind address or the NM.*/
+ public static final String NM_BIND_HOST =
+ NM_PREFIX + "bind-host";
+
/** who will execute(launch) the containers.*/
public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class";
@@ -908,7 +934,7 @@ public class YarnConfiguration extends C
PROXY_PREFIX + "address";
public static final int DEFAULT_PROXY_PORT = 9099;
public static final String DEFAULT_PROXY_ADDRESS =
- "0.0.0.0:" + DEFAULT_RM_PORT;
+ "0.0.0.0:" + DEFAULT_PROXY_PORT;
/**
* YARN Service Level Authorization
@@ -1105,7 +1131,7 @@ public class YarnConfiguration extends C
/** The setting that controls whether timeline service is enabled or not. */
public static final String TIMELINE_SERVICE_ENABLED =
TIMELINE_SERVICE_PREFIX + "enabled";
- public static final boolean DEFAULT_TIMELINE_SERVICE_ENABLED = true;
+ public static final boolean DEFAULT_TIMELINE_SERVICE_ENABLED = false;
/** host:port address for timeline service RPC APIs. */
public static final String TIMELINE_SERVICE_ADDRESS =
@@ -1114,6 +1140,10 @@ public class YarnConfiguration extends C
public static final String DEFAULT_TIMELINE_SERVICE_ADDRESS = "0.0.0.0:"
+ DEFAULT_TIMELINE_SERVICE_PORT;
+ /** The listening endpoint for the timeline service application.*/
+ public static final String TIMELINE_SERVICE_BIND_HOST =
+ TIMELINE_SERVICE_PREFIX + "bind-host";
+
/** The number of threads to handle client RPC API requests. */
public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT =
TIMELINE_SERVICE_PREFIX + "handler-thread-count";
@@ -1136,14 +1166,6 @@ public class YarnConfiguration extends C
public static final String DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS =
"0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT;
- /**The kerberos principal to be used for spnego filter for timeline service.*/
- public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY =
- TIMELINE_SERVICE_PREFIX + "webapp.spnego-principal";
-
- /**The kerberos keytab to be used for spnego filter for timeline service.*/
- public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
- TIMELINE_SERVICE_PREFIX + "webapp.spnego-keytab-file";
-
/** Timeline service store class */
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
@@ -1196,6 +1218,14 @@ public class YarnConfiguration extends C
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
1000 * 60 * 5;
+ /** The Kerberos principal for the timeline server.*/
+ public static final String TIMELINE_SERVICE_PRINCIPAL =
+ TIMELINE_SERVICE_PREFIX + "principal";
+
+ /** The Kerberos keytab for the timeline server.*/
+ public static final String TIMELINE_SERVICE_KEYTAB =
+ TIMELINE_SERVICE_PREFIX + "keytab";
+
////////////////////////////////
// Other Configs
////////////////////////////////
@@ -1340,7 +1370,7 @@ public class YarnConfiguration extends C
public static String getClusterId(Configuration conf) {
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
- throw new HadoopIllegalArgumentException("Configuration doesn't specify" +
+ throw new HadoopIllegalArgumentException("Configuration doesn't specify " +
YarnConfiguration.RM_CLUSTER_ID);
}
return clusterId;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java Tue Aug 19 23:49:39 2014
@@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protoc
/**
* This exception is thrown when an ApplicationMaster asks for resources by
- * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
- * to unregister by calling
- * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
- * API without first registering by calling
+ * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)}
+ * without first registering by calling
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
* or if it tries to register more than once.
*/
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto Tue Aug 19 23:49:39 2014
@@ -127,11 +127,11 @@ message ApplicationAttemptStateDataProto
optional string diagnostics = 6 [default = "N/A"];
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
+ optional int32 am_container_exit_status = 9 [default = -1000];
}
-message RMStateVersionProto {
- optional int32 major_version = 1;
- optional int32 minor_version = 2;
+message EpochProto {
+ optional int64 epoch = 1;
}
//////////////////////////////////////////////////////////////////
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Tue Aug 19 23:49:39 2014
@@ -92,7 +92,7 @@ message ContainerReportProto {
optional ResourceProto resource = 2;
optional NodeIdProto node_id = 3;
optional PriorityProto priority = 4;
- optional int64 start_time = 5;
+ optional int64 creation_time = 5;
optional int64 finish_time = 6;
optional string diagnostics_info = 7 [default = "N/A"];
optional string log_url = 8;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Aug 19 23:49:39 2014
@@ -85,6 +85,7 @@ message AllocateResponseProto {
repeated NMTokenProto nm_tokens = 9;
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
+ optional hadoop.common.TokenProto am_rm_token = 12;
}
//////////////////////////////////////////////////////
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml Tue Aug 19 23:49:39 2014
@@ -38,24 +38,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Aug 19 23:49:39 2014
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
@@ -208,7 +208,8 @@ public class ApplicationMaster {
// App Master configuration
// No. of containers to run shell command on
- private int numTotalContainers = 1;
+ @VisibleForTesting
+ protected int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
// VirtualCores to request for the container on which the shell command will run
@@ -522,6 +523,8 @@ public class ApplicationMaster {
+ appAttemptID.toString(), e);
}
+ // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+ // are marked as LimitedPrivate
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
@@ -592,8 +595,8 @@ public class ApplicationMaster {
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
- LOG.info("Received " + previousAMRunningContainers.size()
- + " previous AM's running containers on AM registration.");
+ LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ + " previous attempts' running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
int numTotalContainersToRequest =
@@ -608,7 +611,7 @@ public class ApplicationMaster {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
- numRequestedContainers.set(numTotalContainersToRequest);
+ numRequestedContainers.set(numTotalContainers);
try {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END);
@@ -687,7 +690,7 @@ public class ApplicationMaster {
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID="
+ LOG.info(appAttemptID + " got container status for containerID="
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
@@ -900,11 +903,6 @@ public class ApplicationMaster {
public void run() {
LOG.info("Setting up container launch container for containerid="
+ container.getId());
- ContainerLaunchContext ctx = Records
- .newRecord(ContainerLaunchContext.class);
-
- // Set the environment
- ctx.setEnvironment(shellEnv);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@@ -935,16 +933,13 @@ public class ApplicationMaster {
return;
}
- LocalResource shellRsrc = Records.newRecord(LocalResource.class);
- shellRsrc.setType(LocalResourceType.FILE);
- shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ URL yarnUrl = null;
try {
- shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
- renamedScriptPath.toString())));
+ yarnUrl = ConverterUtils.getYarnUrlFromURI(
+ new URI(renamedScriptPath.toString()));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedScriptPath, e);
-
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
@@ -953,13 +948,13 @@ public class ApplicationMaster {
numFailedContainers.incrementAndGet();
return;
}
- shellRsrc.setTimestamp(shellScriptPathTimestamp);
- shellRsrc.setSize(shellScriptPathLen);
+ LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ shellScriptPathLen, shellScriptPathTimestamp);
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
ExecShellStringPath, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}
- ctx.setLocalResources(localResources);
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
@@ -986,16 +981,18 @@ public class ApplicationMaster {
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
- ctx.setCommands(commands);
- // Set up tokens for the container too. Today, for normal shell commands,
- // the container in distribute-shell doesn't need any tokens. We are
- // populating them mainly for NodeManagers to be able to download any
- // files in the distributed file-system. The tokens are otherwise also
- // useful in cases, for e.g., when one is running a "hadoop dfs" command
- // inside the distributed shell.
- ctx.setTokens(allTokens.duplicate());
+ // Set up ContainerLaunchContext, setting local resource, environment,
+ // command and token for constructor.
+ // Note for tokens: Set up tokens for the container too. Today, for normal
+ // shell commands, the container in distribute-shell doesn't need any
+ // tokens. We are populating them mainly for NodeManagers to be able to
+ // download anyfiles in the distributed file-system. The tokens are
+ // otherwise also useful in cases, for e.g., when one is running a
+ // "hadoop dfs" command inside the distributed shell.
+ ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
+ localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}
@@ -1024,15 +1021,13 @@ public class ApplicationMaster {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
- Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
- pri.setPriority(requestPriority);
+ Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(containerMemory);
- capability.setVirtualCores(containerVirtualCores);
+ Resource capability = Resource.newInstance(containerMemory,
+ containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
@@ -1059,8 +1054,8 @@ public class ApplicationMaster {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
- entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
- .toString());
+ entity.addPrimaryFilter("user",
+ UserGroupInformation.getCurrentUser().getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1076,8 +1071,8 @@ public class ApplicationMaster {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
- entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
- .toString());
+ entity.addPrimaryFilter("user",
+ UserGroupInformation.getCurrentUser().getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1094,8 +1089,8 @@ public class ApplicationMaster {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(appAttemptId);
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
- entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
- .toString());
+ entity.addPrimaryFilter("user",
+ UserGroupInformation.getCurrentUser().getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Tue Aug 19 23:49:39 2014
@@ -456,9 +456,6 @@ public class Client {
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
-
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
@@ -508,8 +505,6 @@ public class Client {
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
localResources, StringUtils.join(shellArgs, " "));
}
- // Set local resource info into app master container launch context
- amContainer.setLocalResources(localResources);
// Set the necessary security tokens as needed
//amContainer.setContainerTokens(containerToken);
@@ -550,8 +545,6 @@ public class Client {
env.put("CLASSPATH", classPathEnv.toString());
- amContainer.setEnvironment(env);
-
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
@@ -587,14 +580,15 @@ public class Client {
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
- amContainer.setCommands(commands);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
+ localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(amMemory);
- capability.setVirtualCores(amVCores);
+ Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application
@@ -603,6 +597,7 @@ public class Client {
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
+ // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
@@ -627,9 +622,8 @@ public class Client {
appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master
- Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
- pri.setPriority(amPriority);
+ Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java Tue Aug 19 23:49:39 2014
@@ -36,9 +36,11 @@ public class TestDSFailedAppMaster exten
if (appAttemptID.getAttemptId() == 2) {
// should reuse the earlier running container, so numAllocatedContainers
// should be set to 1. And should ask no more containers, so
- // numRequestedContainers should be set to 0.
+ // numRequestedContainers should be the same as numTotalContainers.
+ // The only container is the container requested by the AM in the first
+ // attempt.
if (numAllocatedContainers.get() != 1
- || numRequestedContainers.get() != 0) {
+ || numRequestedContainers.get() != numTotalContainers) {
LOG.info("NumAllocatedContainers is " + numAllocatedContainers.get()
+ " and NumRequestedContainers is " + numAllocatedContainers.get()
+ ".Application Master failed. exiting");
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Tue Aug 19 23:49:39 2014
@@ -26,13 +26,13 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
+import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -73,6 +73,7 @@ public class TestDistributedShell {
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
conf.set("yarn.log.dir", "target");
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(
TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true);
@@ -168,7 +169,9 @@ public class TestDistributedShell {
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
String hostName = NetUtils.getHostname();
+
boolean verified = false;
+ String errorMessage = "";
while(!verified) {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.size() == 0 ) {
@@ -176,15 +179,22 @@ public class TestDistributedShell {
continue;
}
ApplicationReport appReport = apps.get(0);
- if (appReport.getHost().startsWith(hostName)
- && appReport.getRpcPort() == -1) {
+ if(appReport.getHost().equals("N/A")) {
+ Thread.sleep(10);
+ continue;
+ }
+ errorMessage =
+ "Expected host name to start with '" + hostName + "', was '"
+ + appReport.getHost() + "'. Expected rpc port to be '-1', was '"
+ + appReport.getRpcPort() + "'.";
+ if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
verified = true;
}
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) {
break;
}
}
- Assert.assertTrue(verified);
+ Assert.assertTrue(errorMessage, verified);
t.join();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get());
@@ -211,6 +221,64 @@ public class TestDistributedShell {
.toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
}
+ /*
+ * NetUtils.getHostname() returns a string in the form "hostname/ip".
+ * Sometimes the hostname we get is the FQDN and sometimes the short name. In
+ * addition, on machines with multiple network interfaces, it runs any one of
+ * the ips. The function below compares the returns values for
+ * NetUtils.getHostname() accounting for the conditions mentioned.
+ */
+ private boolean checkHostname(String appHostname) throws Exception {
+
+ String hostname = NetUtils.getHostname();
+ if (hostname.equals(appHostname)) {
+ return true;
+ }
+
+ Assert.assertTrue("Unknown format for hostname " + appHostname,
+ appHostname.contains("/"));
+ Assert.assertTrue("Unknown format for hostname " + hostname,
+ hostname.contains("/"));
+
+ String[] appHostnameParts = appHostname.split("/");
+ String[] hostnameParts = hostname.split("/");
+
+ return (compareFQDNs(appHostnameParts[0], hostnameParts[0]) && checkIPs(
+ hostnameParts[0], hostnameParts[1], appHostnameParts[1]));
+ }
+
+ private boolean compareFQDNs(String appHostname, String hostname)
+ throws Exception {
+ if (appHostname.equals(hostname)) {
+ return true;
+ }
+ String appFQDN = InetAddress.getByName(appHostname).getCanonicalHostName();
+ String localFQDN = InetAddress.getByName(hostname).getCanonicalHostName();
+ return appFQDN.equals(localFQDN);
+ }
+
+ private boolean checkIPs(String hostname, String localIP, String appIP)
+ throws Exception {
+
+ if (localIP.equals(appIP)) {
+ return true;
+ }
+ boolean appIPCheck = false;
+ boolean localIPCheck = false;
+ InetAddress[] addresses = InetAddress.getAllByName(hostname);
+ for (InetAddress ia : addresses) {
+ if (ia.getHostAddress().equals(appIP)) {
+ appIPCheck = true;
+ continue;
+ }
+ if (ia.getHostAddress().equals(localIP)) {
+ localIPCheck = true;
+ }
+ }
+ return (appIPCheck && localIPCheck);
+
+ }
+
@Test(timeout=90000)
public void testDSRestartWithPreviousRunningContainers() throws Exception {
String[] args = {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Tue Aug 19 23:49:39 2014
@@ -50,24 +50,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml Tue Aug 19 23:49:39 2014
@@ -35,24 +35,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
@@ -75,14 +57,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
@@ -123,6 +97,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClient.class);
/**
* Create a new instance of AMRMClient.
@@ -207,14 +211,21 @@ public abstract class AMRMClient<T exten
/**
* Request additional containers and receive new container allocations.
- * Requests made via <code>addContainerRequest</code> are sent to the
- * <code>ResourceManager</code>. New containers assigned to the master are
- * retrieved. Status of completed containers and node health updates are
- * also retrieved.
- * This also doubles up as a heartbeat to the ResourceManager and must be
- * made periodically.
- * The call may not always return any new allocations of containers.
- * App should not make concurrent allocate requests. May cause request loss.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are also
+ * retrieved. This also doubles up as a heartbeat to the ResourceManager and
+ * must be made periodically. The call may not always return any new
+ * allocations of containers. App should not make concurrent allocate
+ * requests. May cause request loss.
+ *
+ * <p>
+ * Note : If the user has not removed container requests that have already
+ * been satisfied, then the re-register may end up sending the entire
+ * container requests to the RM (including matched requests). Which would mean
+ * the RM could end up giving it a lot of new allocated containers.
+ * </p>
+ *
* @param progressIndicator Indicates progress made by the master
* @return the response of the allocate request
* @throws YarnException
@@ -329,4 +340,63 @@ public abstract class AMRMClient<T exten
return nmTokenCache;
}
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Tue Aug 19 23:49:39 2014
@@ -18,11 +18,15 @@
package org.apache.hadoop.yarn.client.api.async;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -90,6 +94,7 @@ import com.google.common.annotations.Vis
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest>
extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
protected final AMRMClient<T> client;
protected final CallbackHandler handler;
@@ -189,6 +194,65 @@ extends AbstractService {
*/
public abstract int getClusterNodeCount();
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ };
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
public interface CallbackHandler {
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java Tue Aug 19 23:49:39 2014
@@ -234,8 +234,7 @@ extends AMRMClientAsync<T> {
while (true) {
try {
responseQueue.put(response);
- if (response.getAMCommand() == AMCommand.AM_RESYNC
- || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+ if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
return;
}
break;
@@ -280,7 +279,6 @@ extends AMRMClientAsync<T> {
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
- case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -47,20 +49,25 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -77,10 +84,18 @@ public class AMRMClientImpl<T extends Co
private int lastResponseId = 0;
+ protected String appHostName;
+ protected int appHostPort;
+ protected String appTrackingUrl;
+
protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ // blacklistedNodes is required for keeping history of blacklisted nodes that
+ // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get
+ // current blacklisted nodes and send back to RM.
+ protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
@@ -150,6 +165,10 @@ public class AMRMClientImpl<T extends Co
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+ // pendingRelease holds history or release requests.request is removed only if
+ // RM sends completedContainer.
+ // How it different from release? --> release is for per allocate() request.
+ protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@@ -185,19 +204,27 @@ public class AMRMClientImpl<T extends Co
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
Preconditions.checkArgument(appHostName != null,
"The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
+ " should be any integers larger than or equal to -1");
- // do this only once ???
+
+ return registerApplicationMaster();
+ }
+
+ private RegisterApplicationMasterResponse registerApplicationMaster()
+ throws YarnException, IOException {
RegisterApplicationMasterRequest request =
- RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
- appTrackingUrl);
+ RegisterApplicationMasterRequest.newInstance(this.appHostName,
+ this.appHostPort, this.appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
-
synchronized (this) {
- if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+ lastResponseId = 0;
+ if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
@@ -249,6 +276,25 @@ public class AMRMClientImpl<T extends Co
}
allocateResponse = rmClient.allocate(allocateRequest);
+ if (isResyncCommand(allocateResponse)) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ synchronized (this) {
+ release.addAll(this.pendingRelease);
+ blacklistAdditions.addAll(this.blacklistedNodes);
+ for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
+ .values()) {
+ for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
+ for (ResourceRequestInfo request : capabalities.values()) {
+ addResourceRequestToAsk(request.remoteRequest);
+ }
+ }
+ }
+ }
+ // re register with RM
+ registerApplicationMaster();
+ return allocate(progressIndicator);
+ }
synchronized (this) {
// update these on successful RPC
@@ -258,6 +304,14 @@ public class AMRMClientImpl<T extends Co
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
+ if (allocateResponse.getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAMRMToken());
+ }
+ if (!pendingRelease.isEmpty()
+ && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
+ removePendingReleaseRequests(allocateResponse
+ .getCompletedContainersStatuses());
+ }
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -288,6 +342,18 @@ public class AMRMClientImpl<T extends Co
return allocateResponse;
}
+ protected void removePendingReleaseRequests(
+ List<ContainerStatus> completedContainersStatuses) {
+ for (ContainerStatus containerStatus : completedContainersStatuses) {
+ pendingRelease.remove(containerStatus.getContainerId());
+ }
+ }
+
+ private boolean isResyncCommand(AllocateResponse allocateResponse) {
+ return allocateResponse.getAMCommand() != null
+ && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+ }
+
@Private
@VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -324,6 +390,12 @@ public class AMRMClientImpl<T extends Co
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for application"
+ " to be removed from RMStateStore");
+ } catch (ApplicationMasterNotRegisteredException e) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ // re register with RM
+ registerApplicationMaster();
+ unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
}
@@ -414,6 +486,7 @@ public class AMRMClientImpl<T extends Co
public synchronized void releaseAssignedContainer(ContainerId containerId) {
Preconditions.checkArgument(containerId != null,
"ContainerId can not be null.");
+ pendingRelease.add(containerId);
release.add(containerId);
}
@@ -655,6 +728,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
+ this.blacklistedNodes.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
@@ -662,6 +736,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
+ this.blacklistedNodes.removeAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
@@ -675,4 +750,16 @@ public class AMRMClientImpl<T extends Co
"blacklistRemovals in updateBlacklist.");
}
}
+
+ private void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+ new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
+ .getIdentifier().array(), token.getPassword().array(), new Text(
+ token.getKind()), new Text(token.getService()));
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ currentUGI = UserGroupInformation.getLoginUser();
+ }
+ currentUGI.addToken(amrmToken);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -29,8 +30,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
@@ -64,6 +70,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -74,6 +81,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AHSClient;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -82,8 +90,10 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -97,8 +107,11 @@ public class YarnClientImpl extends Yarn
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis;
- protected AHSClient historyClient;
+ private AHSClient historyClient;
private boolean historyServiceEnabled;
+ protected TimelineClient timelineClient;
+ protected Text timelineService;
+ protected boolean timelineServiceEnabled;
private static final String ROOT = "root";
@@ -126,10 +139,17 @@ public class YarnClientImpl extends Yarn
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
historyServiceEnabled = true;
- historyClient = AHSClientImpl.createAHSClient();
- historyClient.init(getConfig());
+ historyClient = AHSClient.createAHSClient();
+ historyClient.init(conf);
}
+ if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+ timelineServiceEnabled = true;
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ timelineService = TimelineUtils.buildTimelineTokenService(conf);
+ }
super.serviceInit(conf);
}
@@ -141,6 +161,9 @@ public class YarnClientImpl extends Yarn
if (historyServiceEnabled) {
historyClient.start();
}
+ if (timelineServiceEnabled) {
+ timelineClient.start();
+ }
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@@ -155,6 +178,9 @@ public class YarnClientImpl extends Yarn
if (historyServiceEnabled) {
historyClient.stop();
}
+ if (timelineServiceEnabled) {
+ timelineClient.stop();
+ }
super.serviceStop();
}
@@ -189,6 +215,12 @@ public class YarnClientImpl extends Yarn
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
+ // Automatically add the timeline DT into the CLC
+ // Only when the security and the timeline service are both enabled
+ if (isSecurityEnabled() && timelineServiceEnabled) {
+ addTimelineDelegationToken(appContext.getAMContainerSpec());
+ }
+
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
@@ -238,6 +270,48 @@ public class YarnClientImpl extends Yarn
return applicationId;
}
+ private void addTimelineDelegationToken(
+ ContainerLaunchContext clc) throws YarnException, IOException {
+ org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> timelineDelegationToken =
+ timelineClient.getDelegationToken(
+ UserGroupInformation.getCurrentUser().getUserName());
+ if (timelineDelegationToken == null) {
+ return;
+ }
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ ByteBuffer tokens = clc.getTokens();
+ if (tokens != null) {
+ dibb.reset(tokens);
+ credentials.readTokenStorageStream(dibb);
+ tokens.rewind();
+ }
+ // If the timeline delegation token is already in the CLC, no need to add
+ // one more
+ for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials
+ .getAllTokens()) {
+ TokenIdentifier tokenIdentifier = token.decodeIdentifier();
+ if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) {
+ return;
+ }
+ }
+ credentials.addToken(timelineService, timelineDelegationToken);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add timline delegation token into credentials: "
+ + timelineDelegationToken);
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ clc.setTokens(tokens);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected boolean isSecurityEnabled() {
+ return UserGroupInformation.isSecurityEnabled();
+ }
+
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Tue Aug 19 23:49:39 2014
@@ -304,7 +304,7 @@ public class ApplicationCLI extends Yarn
containerReportStr.print("\tContainer-Id : ");
containerReportStr.println(containerReport.getContainerId());
containerReportStr.print("\tStart-Time : ");
- containerReportStr.println(containerReport.getStartTime());
+ containerReportStr.println(containerReport.getCreationTime());
containerReportStr.print("\tFinish-Time : ");
containerReportStr.println(containerReport.getFinishTime());
containerReportStr.print("\tState : ");
@@ -525,7 +525,7 @@ public class ApplicationCLI extends Yarn
"Finish Time", "State", "Host", "LOG-URL");
for (ContainerReport containerReport : appsReport) {
writer.printf(CONTAINER_PATTERN, containerReport.getContainerId(),
- containerReport.getStartTime(), containerReport.getFinishTime(),
+ containerReport.getCreationTime(), containerReport.getFinishTime(),
containerReport.getContainerState(), containerReport
.getAssignedNode(), containerReport.getLogUrl());
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java Tue Aug 19 23:49:39 2014
@@ -28,7 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
@@ -267,6 +267,7 @@ public abstract class ProtocolHATestBase
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
boolean overrideRTS, boolean overrideApplicationMasterService)
throws Exception {
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster =
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java Tue Aug 19 23:49:39 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.util.ArrayList;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@@ -54,11 +54,9 @@ public class TestApplicationMasterServic
amClient = ClientRMProxy
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
- AMRMTokenIdentifier id =
- new AMRMTokenIdentifier(attemptId);
Token<AMRMTokenIdentifier> appToken =
- new Token<AMRMTokenIdentifier>(id, this.cluster.getResourceManager()
- .getRMContext().getAMRMTokenSecretManager());
+ this.cluster.getResourceManager().getRMContext()
+ .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service"));
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser()
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java Tue Aug 19 23:49:39 2014
@@ -305,7 +305,8 @@ public class TestRMAdminCLI {
testError(new String[] { "-help", "-getGroups" },
"Usage: yarn rmadmin [-getGroups [username]]", dataErr, 0);
testError(new String[] { "-help", "-transitionToActive" },
- "Usage: yarn rmadmin [-transitionToActive <serviceId>]", dataErr, 0);
+ "Usage: yarn rmadmin [-transitionToActive <serviceId>" +
+ " [--forceactive]]", dataErr, 0);
testError(new String[] { "-help", "-transitionToStandby" },
"Usage: yarn rmadmin [-transitionToStandby <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-getServiceState" },
@@ -332,9 +333,9 @@ public class TestRMAdminCLI {
"yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
- " [username]] [-help [cmd]] [-transitionToActive <serviceId>]" +
- " [-transitionToStandby <serviceId>] [-failover [--forcefence]" +
- " [--forceactive] <serviceId> <serviceId>] " +
+ " [username]] [-help [cmd]] [-transitionToActive <serviceId>" +
+ " [--forceactive]] [-transitionToStandby <serviceId>] [-failover" +
+ " [--forcefence] [--forceactive] <serviceId> <serviceId>] " +
"[-getServiceState <serviceId>] [-checkHealth <serviceId>]"));
} finally {
System.setOut(oldOutPrintStream);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java Tue Aug 19 23:49:39 2014
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.junit.After;
import org.junit.Assert;
@@ -169,6 +173,7 @@ public class TestRMFailover extends Clie
verifyConnections();
}
+ @SuppressWarnings("unchecked")
@Test
public void testAutomaticFailover()
throws YarnException, InterruptedException, IOException {
@@ -186,6 +191,25 @@ public class TestRMFailover extends Clie
failover();
verifyConnections();
+
+ // Make the current Active handle an RMFatalEvent,
+ // so it transitions to standby.
+ ResourceManager rm = cluster.getResourceManager(
+ cluster.getActiveRMIndex());
+ RMFatalEvent event =
+ new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
+ "Fake RMFatalEvent");
+ rm.getRMContext().getDispatcher().getEventHandler().handle(event);
+ int maxWaitingAttempts = 2000;
+ while (maxWaitingAttempts-- > 0 ) {
+ if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
+ break;
+ }
+ Thread.sleep(1);
+ }
+ Assert.assertFalse("RM didn't transition to Standby ",
+ maxWaitingAttempts == 0);
+ verifyConnections();
}
@Test
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA ext
// make sure registerNodeManager works when failover happens
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
- YarnVersionInfo.getVersion(), null);
+ YarnVersionInfo.getVersion(), null, null);
resourceTracker.registerNodeManager(request);
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.async.impl;
+import com.google.common.base.Supplier;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
@@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
when(client.allocate(anyFloat())).thenThrow(ex);
- AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
@@ -203,43 +204,35 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
- @Test//(timeout=10000)
- public void testAMRMClientAsyncReboot() throws Exception {
+ @Test (timeout = 10000)
+ public void testAMRMClientAsyncShutDown() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-
- final AllocateResponse rebootResponse = createAllocateResponse(
+
+ final AllocateResponse shutDownResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
- rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
- when(client.allocate(anyFloat())).thenReturn(rebootResponse);
-
- AMRMClientAsync<ContainerRequest> asyncClient =
- AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+ when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
-
- synchronized (callbackHandler.notifier) {
- asyncClient.registerApplicationMaster("localhost", 1234, null);
- while(callbackHandler.reboot == false) {
- try {
- callbackHandler.notifier.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+
+ Thread.sleep(50);
+
+ verify(client, times(1)).allocate(anyFloat());
asyncClient.stop();
- // stopping should have joined all threads and completed all callbacks
- Assert.assertTrue(callbackHandler.callbackCount == 0);
}
-
+
@Test (timeout = 10000)
- public void testAMRMClientAsyncShutDown() throws Exception {
+ public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
Configuration conf = new Configuration();
- TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ final TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
@@ -253,9 +246,19 @@ public class TestAMRMClientAsync {
asyncClient.init(conf);
asyncClient.start();
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.reboot;
+ }
+ };
+
asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
- Thread.sleep(50);
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
verify(client, times(1)).allocate(anyFloat());
asyncClient.stop();
@@ -295,6 +298,40 @@ public class TestAMRMClientAsync {
}
}
+ @Test (timeout = 5000)
+ public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor()
+ throws YarnException, IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ List<ContainerStatus> completed = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ final AllocateResponse response = createAllocateResponse(completed,
+ new ArrayList<Container>(), null);
+
+ when(client.allocate(anyFloat())).thenReturn(response);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ callbackHandler.asynClient = asyncClient;
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.notify;
+ }
+ };
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
+ Assert.assertTrue(checker.get());
+ }
+
void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
InterruptedException, YarnException, IOException {
Configuration conf = new Configuration();
@@ -375,7 +412,7 @@ public class TestAMRMClientAsync {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
Exception savedException = null;
- boolean reboot = false;
+ volatile boolean reboot = false;
Object notifier = new Object();
int callbackCount = 0;
@@ -465,7 +502,7 @@ public class TestAMRMClientAsync {
@SuppressWarnings("rawtypes")
AMRMClientAsync asynClient;
boolean stop = true;
- boolean notify = false;
+ volatile boolean notify = false;
boolean throwOutException = false;
@Override