You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/08/12 13:02:39 UTC
svn commit: r1617450 [2/2] - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s...
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Tue Aug 12 11:02:38 2014
@@ -29,8 +29,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -59,6 +61,40 @@ public abstract class NMStateStoreServic
}
}
+ public enum RecoveredContainerStatus {
+ REQUESTED,
+ LAUNCHED,
+ COMPLETED
+ }
+
+ public static class RecoveredContainerState {
+ RecoveredContainerStatus status;
+ int exitCode = ContainerExitStatus.INVALID;
+ boolean killed = false;
+ String diagnostics = "";
+ StartContainerRequest startRequest;
+
+ public RecoveredContainerStatus getStatus() {
+ return status;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public boolean getKilled() {
+ return killed;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public StartContainerRequest getStartRequest() {
+ return startRequest;
+ }
+ }
+
public static class LocalResourceTrackerState {
List<LocalizedResourceProto> localizedResources =
new ArrayList<LocalizedResourceProto>();
@@ -176,20 +212,101 @@ public abstract class NMStateStoreServic
}
+ /**
+ * Load the state of applications
+ * @return recovered state for applications
+ * @throws IOException
+ */
public abstract RecoveredApplicationsState loadApplicationsState()
throws IOException;
+ /**
+ * Record the start of an application
+ * @param appId the application ID
+ * @param p state to store for the application
+ * @throws IOException
+ */
public abstract void storeApplication(ApplicationId appId,
ContainerManagerApplicationProto p) throws IOException;
+ /**
+ * Record that an application has finished
+ * @param appId the application ID
+ * @throws IOException
+ */
public abstract void storeFinishedApplication(ApplicationId appId)
throws IOException;
+ /**
+ * Remove records corresponding to an application
+ * @param appId the application ID
+ * @throws IOException
+ */
public abstract void removeApplication(ApplicationId appId)
throws IOException;
/**
+ * Load the state of containers
+ * @return recovered state for containers
+ * @throws IOException
+ */
+ public abstract List<RecoveredContainerState> loadContainersState()
+ throws IOException;
+
+ /**
+ * Record a container start request
+ * @param containerId the container ID
+ * @param startRequest the container start request
+ * @throws IOException
+ */
+ public abstract void storeContainer(ContainerId containerId,
+ StartContainerRequest startRequest) throws IOException;
+
+ /**
+ * Record that a container has been launched
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void storeContainerLaunched(ContainerId containerId)
+ throws IOException;
+
+ /**
+ * Record that a container has completed
+ * @param containerId the container ID
+ * @param exitCode the exit code from the container
+ * @throws IOException
+ */
+ public abstract void storeContainerCompleted(ContainerId containerId,
+ int exitCode) throws IOException;
+
+ /**
+ * Record a request to kill a container
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void storeContainerKilled(ContainerId containerId)
+ throws IOException;
+
+ /**
+ * Record diagnostics for a container
+ * @param containerId the container ID
+ * @param diagnostics the container diagnostics
+ * @throws IOException
+ */
+ public abstract void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException;
+
+ /**
+ * Remove records corresponding to a container
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void removeContainer(ContainerId containerId)
+ throws IOException;
+
+
+ /**
* Load the state of localized resources
* @return recovered localized resource state
* @throws IOException
@@ -230,43 +347,111 @@ public abstract class NMStateStoreServic
ApplicationId appId, Path localPath) throws IOException;
+ /**
+ * Load the state of the deletion service
+ * @return recovered deletion service state
+ * @throws IOException
+ */
public abstract RecoveredDeletionServiceState loadDeletionServiceState()
throws IOException;
+ /**
+ * Record a deletion task
+ * @param taskId the deletion task ID
+ * @param taskProto the deletion task protobuf
+ * @throws IOException
+ */
public abstract void storeDeletionTask(int taskId,
DeletionServiceDeleteTaskProto taskProto) throws IOException;
+ /**
+ * Remove records corresponding to a deletion task
+ * @param taskId the deletion task ID
+ * @throws IOException
+ */
public abstract void removeDeletionTask(int taskId) throws IOException;
+ /**
+ * Load the state of NM tokens
+ * @return recovered state of NM tokens
+ * @throws IOException
+ */
public abstract RecoveredNMTokensState loadNMTokensState()
throws IOException;
+ /**
+ * Record the current NM token master key
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the previous NM token master key
+ * @param key the previous master key
+ * @throws IOException
+ */
public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record a master key corresponding to an application
+ * @param attempt the application attempt ID
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt, MasterKey key) throws IOException;
+ /**
+ * Remove a master key corresponding to an application
+ * @param attempt the application attempt ID
+ * @throws IOException
+ */
public abstract void removeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt) throws IOException;
+ /**
+ * Load the state of container tokens
+ * @return recovered state of container tokens
+ * @throws IOException
+ */
public abstract RecoveredContainerTokensState loadContainerTokensState()
throws IOException;
+ /**
+ * Record the current container token master key
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the previous container token master key
+ * @param key the previous master key
+ * @throws IOException
+ */
public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the expiration time for a container token
+ * @param containerId the container ID
+ * @param expirationTime the container token expiration time
+ * @throws IOException
+ */
public abstract void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException;
+ /**
+ * Remove records for a container token
+ * @param containerId the container ID
+ * @throws IOException
+ */
public abstract void removeContainerToken(ContainerId containerId)
throws IOException;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Tue Aug 12 11:02:38 2014
@@ -33,6 +33,7 @@
#include <limits.h>
#include <sys/stat.h>
#include <sys/mount.h>
+#include <sys/wait.h>
static const int DEFAULT_MIN_USERID = 1000;
@@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const
}
/**
+ * Write the exit code of the container into the exit code file
+ * exit_code_file: Path to exit code file where exit code needs to be written
+ */
+static int write_exit_code_file(const char* exit_code_file, int exit_code) {
+ char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
+ exit_code_file);
+ if (tmp_ecode_file == NULL) {
+ return -1;
+ }
+
+ // create with 700
+ int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU);
+ if (ecode_fd == -1) {
+ fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file,
+ strerror(errno));
+ free(tmp_ecode_file);
+ return -1;
+ }
+
+ char ecode_buf[21];
+ snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code);
+ ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf));
+ close(ecode_fd);
+ if (written == -1) {
+ fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n",
+ tmp_ecode_file, strerror(errno));
+ free(tmp_ecode_file);
+ return -1;
+ }
+
+ // rename temp file to actual exit code file
+ // use rename as atomic
+ if (rename(tmp_ecode_file, exit_code_file)) {
+ fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n",
+ tmp_ecode_file, exit_code_file, strerror(errno));
+ unlink(tmp_ecode_file);
+ free(tmp_ecode_file);
+ return -1;
+ }
+
+ free(tmp_ecode_file);
+ return 0;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+ int child_status = -1;
+ int exit_code = -1;
+ int waitpid_result;
+
+ if (change_effective_user(nm_uid, nm_gid) != 0) {
+ return -1;
+ }
+ do {
+ waitpid_result = waitpid(pid, &child_status, 0);
+ } while (waitpid_result == -1 && errno == EINTR);
+ if (waitpid_result < 0) {
+ fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
+ pid, strerror(errno));
+ return -1;
+ }
+ if (WIFEXITED(child_status)) {
+ exit_code = WEXITSTATUS(child_status);
+ } else if (WIFSIGNALED(child_status)) {
+ exit_code = 0x80 + WTERMSIG(child_status);
+ } else {
+ fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
+ }
+ if (write_exit_code_file(exit_code_file, exit_code) < 0) {
+ return -1;
+ }
+ return exit_code;
+}
+
+/**
* Change the real and effective user and group to abandon the super user
* priviledges.
*/
@@ -337,6 +417,10 @@ char *get_container_work_directory(const
nm_root, user, app_id, container_id);
}
+char *get_exit_code_file(const char* pid_file) {
+ return concatenate("%s.exitcode", "exit_code_file", 1, pid_file);
+}
+
char *get_container_launcher_file(const char* work_dir) {
return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
}
@@ -879,6 +963,8 @@ int launch_container_as_user(const char
int exit_code = -1;
char *script_file_dest = NULL;
char *cred_file_dest = NULL;
+ char *exit_code_file = NULL;
+
script_file_dest = get_container_launcher_file(work_dir);
if (script_file_dest == NULL) {
exit_code = OUT_OF_MEMORY;
@@ -889,6 +975,11 @@ int launch_container_as_user(const char
exit_code = OUT_OF_MEMORY;
goto cleanup;
}
+ exit_code_file = get_exit_code_file(pid_file);
+ if (NULL == exit_code_file) {
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
+ }
// open launch script
int container_file_source = open_file_as_nm(script_name);
@@ -902,6 +993,13 @@ int launch_container_as_user(const char
goto cleanup;
}
+ pid_t child_pid = fork();
+ if (child_pid != 0) {
+ // parent
+ exit_code = wait_and_write_exit_code(child_pid, exit_code_file);
+ goto cleanup;
+ }
+
// setsid
pid_t pid = setsid();
if (pid == -1) {
@@ -986,6 +1084,7 @@ int launch_container_as_user(const char
exit_code = 0;
cleanup:
+ free(exit_code_file);
free(script_file_dest);
free(cred_file_dest);
return exit_code;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Aug 12 11:02:38 2014
@@ -201,6 +201,7 @@ public class TestNodeStatusUpdater {
Dispatcher mockDispatcher = mock(Dispatcher.class);
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
+ NMStateStoreService stateStore = new NMNullStateStoreService();
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -226,9 +227,8 @@ public class TestNodeStatusUpdater {
firstContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
- Container container =
- new ContainerImpl(conf, mockDispatcher, launchContext, null,
- mockMetrics, containerToken);
+ Container container = new ContainerImpl(conf, mockDispatcher,
+ stateStore, launchContext, null, mockMetrics, containerToken);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -257,9 +257,8 @@ public class TestNodeStatusUpdater {
secondContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
- Container container =
- new ContainerImpl(conf, mockDispatcher, launchContext, null,
- mockMetrics, containerToken);
+ Container container = new ContainerImpl(conf, mockDispatcher,
+ stateStore, launchContext, null, mockMetrics, containerToken);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -784,7 +783,7 @@ public class TestNodeStatusUpdater {
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
- nodeStatusUpdater.updateStoppedContainersInCache(cId);
+ nodeStatusUpdater.addCompletedContainer(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
long time1 = System.currentTimeMillis();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue Aug 12 11:02:38 2014
@@ -233,7 +233,7 @@ public abstract class BaseContainerManag
protected DeletionService createDeletionService() {
return new DeletionService(exec) {
@Override
- public void delete(String user, Path subDir, Path[] baseDirs) {
+ public void delete(String user, Path subDir, Path... baseDirs) {
// Don't do any deletions.
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
+ ", baseDirs - " + baseDirs);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Tue Aug 12 11:02:38 2014
@@ -191,7 +191,8 @@ public class TestAuxServices {
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
ContainerId.newInstance(attemptId, 1), "", "",
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
- Container container = new ContainerImpl(null, null, null, null, null, cti);
+ Container container = new ContainerImpl(null, null, null, null, null,
+ null, cti);
ContainerId containerId = container.getContainerId();
Resource resource = container.getResource();
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java Tue Aug 12 11:02:38 2014
@@ -80,6 +80,7 @@ public class TestContainerManagerRecover
public void testApplicationRecovery() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Aug 12 11:02:38 2014
@@ -780,7 +780,8 @@ public class TestContainer {
}
when(ctxt.getServiceData()).thenReturn(serviceData);
- c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+ c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
+ ctxt, null, metrics, identifier);
dispatcher.register(ContainerEventType.class,
new EventHandler<ContainerEvent>() {
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Tue Aug 12 11:02:38 2014
@@ -22,13 +22,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
private Set<ApplicationId> finishedApps;
+ private Map<ContainerId, RecoveredContainerState> containerStates;
private Map<TrackerKey, TrackerState> trackerStates;
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokensState nmTokenState;
@@ -53,6 +57,7 @@ public class NMMemoryStateStoreService e
protected void initStorage(Configuration conf) {
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
finishedApps = new HashSet<ApplicationId>();
+ containerStates = new HashMap<ContainerId, RecoveredContainerState>();
nmTokenState = new RecoveredNMTokensState();
nmTokenState.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>();
@@ -100,6 +105,77 @@ public class NMMemoryStateStoreService e
finishedApps.remove(appId);
}
+ @Override
+ public List<RecoveredContainerState> loadContainersState()
+ throws IOException {
+ // return a copy so caller can't modify our state
+ List<RecoveredContainerState> result =
+ new ArrayList<RecoveredContainerState>(containerStates.size());
+ for (RecoveredContainerState rcs : containerStates.values()) {
+ RecoveredContainerState rcsCopy = new RecoveredContainerState();
+ rcsCopy.status = rcs.status;
+ rcsCopy.exitCode = rcs.exitCode;
+ rcsCopy.killed = rcs.killed;
+ rcsCopy.diagnostics = rcs.diagnostics;
+ rcsCopy.startRequest = rcs.startRequest;
+ result.add(rcsCopy);
+ }
+ return new ArrayList<RecoveredContainerState>();
+ }
+
+ @Override
+ public void storeContainer(ContainerId containerId,
+ StartContainerRequest startRequest) throws IOException {
+ RecoveredContainerState rcs = new RecoveredContainerState();
+ rcs.startRequest = startRequest;
+ containerStates.put(containerId, rcs);
+ }
+
+ @Override
+ public void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.diagnostics = diagnostics.toString();
+ }
+
+ @Override
+ public void storeContainerLaunched(ContainerId containerId)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ if (rcs.exitCode != ContainerExitStatus.INVALID) {
+ throw new IOException("Container already completed");
+ }
+ rcs.status = RecoveredContainerStatus.LAUNCHED;
+ }
+
+ @Override
+ public void storeContainerKilled(ContainerId containerId)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.killed = true;
+ }
+
+ @Override
+ public void storeContainerCompleted(ContainerId containerId, int exitCode)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.status = RecoveredContainerStatus.COMPLETED;
+ rcs.exitCode = exitCode;
+ }
+
+ @Override
+ public void removeContainer(ContainerId containerId) throws IOException {
+ containerStates.remove(containerId);
+ }
+
+ private RecoveredContainerState getRecoveredContainerState(
+ ContainerId containerId) throws IOException {
+ RecoveredContainerState rcs = containerStates.get(containerId);
+ if (rcs == null) {
+ throw new IOException("No start request for " + containerId);
+ }
+ return rcs;
+ }
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
LocalResourceTrackerState result = new LocalResourceTrackerState();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Tue Aug 12 11:02:38 2014
@@ -25,18 +25,30 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@@ -193,6 +208,115 @@ public class TestNMLeveldbStateStoreServ
}
@Test
+ public void testContainerStorage() throws IOException {
+ // test empty when no state
+ List<RecoveredContainerState> recoveredContainers =
+ stateStore.loadContainersState();
+ assertTrue(recoveredContainers.isEmpty());
+
+ // create a container request
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 4);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 5);
+ LocalResource lrsrc = LocalResource.newInstance(
+ URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+ 1234567890L);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put("rsrc", lrsrc);
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("somevar", "someval");
+ List<String> containerCmds = new ArrayList<String>();
+ containerCmds.add("somecmd");
+ containerCmds.add("somearg");
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ serviceData.put("someservice",
+ ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+ ByteBuffer containerTokens =
+ ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>();
+ acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+ acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, env, containerCmds, serviceData, containerTokens,
+ acls);
+ Resource containerRsrc = Resource.newInstance(1357, 3);
+ ContainerTokenIdentifier containerTokenId =
+ new ContainerTokenIdentifier(containerId, "host", "user",
+ containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
+ 13579);
+ Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+ ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+ "tokenservice");
+ StartContainerRequest containerReq =
+ StartContainerRequest.newInstance(clc, containerToken);
+
+ // store a container and verify recovered
+ stateStore.storeContainer(containerId, containerReq);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ RecoveredContainerState rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertEquals(false, rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+ assertTrue(rcs.getDiagnostics().isEmpty());
+
+ // launch the container, add some diagnostics, and verify recovered
+ StringBuilder diags = new StringBuilder();
+ stateStore.storeContainerLaunched(containerId);
+ diags.append("some diags for container");
+ stateStore.storeContainerDiagnostics(containerId, diags);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertEquals(false, rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(diags.toString(), rcs.getDiagnostics());
+
+ // mark the container killed, add some more diags, and verify recovered
+ diags.append("some more diags for container");
+ stateStore.storeContainerDiagnostics(containerId, diags);
+ stateStore.storeContainerKilled(containerId);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertTrue(rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(diags.toString(), rcs.getDiagnostics());
+
+ // add yet more diags, mark container completed, and verify recovered
+ diags.append("some final diags");
+ stateStore.storeContainerDiagnostics(containerId, diags);
+ stateStore.storeContainerCompleted(containerId, 21);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
+ assertEquals(21, rcs.getExitCode());
+ assertTrue(rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(diags.toString(), rcs.getDiagnostics());
+
+ // remove the container and verify not recovered
+ stateStore.removeContainer(containerId);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertTrue(recoveredContainers.isEmpty());
+ }
+
+ @Test
public void testStartResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Aug 12 11:02:38 2014
@@ -209,7 +209,7 @@ public class TestNMWebServer {
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
"password".getBytes(), currentTime);
Container container =
- new ContainerImpl(conf, dispatcher, launchContext,
+ new ContainerImpl(conf, dispatcher, stateStore, launchContext,
null, metrics,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 12 11:02:38 2014
@@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod
private final RMContext context;
private final String hostName;
private final int commandPort;
- private final int httpPort;
+ private int httpPort;
private final String nodeAddress; // The containerManager address
- private final String httpAddress;
+ private String httpAddress;
private volatile ResourceOption resourceOption;
private final Node node;
@@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- // Kill containers since node is rejoining.
- rmNode.nodeUpdateQueue.clear();
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeRemovedSchedulerEvent(rmNode));
-
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
- if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
- && rmNode.getHttpPort() == newNode.getHttpPort()) {
- // Reset heartbeat ID since node just restarted.
- rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
- if (rmNode.getState() != NodeState.UNHEALTHY) {
- // Only add new node if old state is not UNHEALTHY
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- }
- } else {
- // Reconnected node differs, so replace old node and start new node
- switch (rmNode.getState()) {
- case RUNNING:
- ClusterMetrics.getMetrics().decrNumActiveNodes();
- break;
- case UNHEALTHY:
- ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
- break;
- }
- rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
- rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(newNode.getNodeID(), null, null));
- }
+ rmNode.httpPort = newNode.getHttpPort();
+ rmNode.httpAddress = newNode.getHttpAddress();
+ rmNode.resourceOption = newNode.getResourceOption();
+
+ // Reset heartbeat ID since node just restarted.
+ rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
if (null != reconnectEvent.getRunningApplications()) {
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Tue Aug 12 11:02:38 2014
@@ -153,14 +153,17 @@ public class SchedulerUtils {
* @param rmNode RMNode with new resource view
* @param clusterResource the cluster's resource that need to update
* @param log Scheduler's log for resource change
+ * @return true if the resources have changed
*/
- public static void updateResourceIfChanged(SchedulerNode node,
+ public static boolean updateResourceIfChanged(SchedulerNode node,
RMNode rmNode, Resource clusterResource, Log log) {
+ boolean result = false;
Resource oldAvailableResource = node.getAvailableResource();
Resource newAvailableResource = Resources.subtract(
rmNode.getTotalCapability(), node.getUsedResource());
if (!newAvailableResource.equals(oldAvailableResource)) {
+ result = true;
Resource deltaResource = Resources.subtract(newAvailableResource,
oldAvailableResource);
// Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
+ " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ deltaResource.getMemory() +"MB");
}
+
+ return result;
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 12 11:02:38 2014
@@ -783,7 +783,10 @@ public class CapacityScheduler extends
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+ if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+ LOG)) {
+ root.updateClusterResource(clusterResource);
+ }
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1617450&r1=1617449&r2=1617450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Tue Aug 12 11:02:38 2014
@@ -595,7 +595,7 @@ public class TestResourceTrackerService
// reconnect of node with changed capability
nm1 = rm.registerNode("host2:5678", 10240);
dispatcher.await();
- response = nm2.nodeHeartbeat(true);
+ response = nm1.nodeHeartbeat(true);
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());