You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by wa...@apache.org on 2014/06/19 01:15:11 UTC
svn commit: r1603664 [1/2] - in
/hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/...
Author: wang
Date: Wed Jun 18 23:15:04 2014
New Revision: 1603664
URL: http://svn.apache.org/r1603664
Log:
Merge trunk r1603663 to branch.
Added:
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
- copied unchanged from r1603663, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
- copied unchanged from r1603663, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
Removed:
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
Modified:
hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt Wed Jun 18 23:15:04 2014
@@ -39,6 +39,9 @@ Release 2.5.0 - UNRELEASED
YARN-1702. Added kill app functionality to RM web services. (Varun Vasudev
via vinodkv)
+ YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
+ via junping_du)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
@@ -164,6 +167,9 @@ Release 2.5.0 - UNRELEASED
YARN-2125. Changed ProportionalCapacityPreemptionPolicy to log CSV in debug
level. (Wangda Tan via jianhe)
+ YARN-2159. Better logging in SchedulerNode#allocateContainer.
+ (Ray Chiang via kasha)
+
OPTIMIZATIONS
BUG FIXES
@@ -245,6 +251,14 @@ Release 2.5.0 - UNRELEASED
YARN-2155. FairScheduler: Incorrect threshold check for preemption.
(Wei Yan via kasha)
+ YARN-1885. Fixed a bug that RM may not send application-clean-up signal
+ to NMs where the completed applications previously ran in case of RM restart.
+ (Wangda Tan via jianhe)
+
+ YARN-2167. LeveldbIterator should get closed in
+ NMLeveldbStateStoreService#loadLocalizationState() within finally block
+ (Junping Du via jlowe)
+
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java Wed Jun 18 23:15:04 2014
@@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA ext
// make sure registerNodeManager works when failover happens
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
- YarnVersionInfo.getVersion(), null);
+ YarnVersionInfo.getVersion(), null, null);
resourceTracker.registerNodeManager(request);
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Wed Jun 18 23:15:04 2014
@@ -20,15 +20,17 @@ package org.apache.hadoop.yarn.server.ap
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
public abstract class RegisterNodeManagerRequest {
-
+
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
- List<NMContainerStatus> containerStatuses) {
+ List<NMContainerStatus> containerStatuses,
+ List<ApplicationId> runningApplications) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@@ -36,6 +38,7 @@ public abstract class RegisterNodeManage
request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
+ request.setRunningApplications(runningApplications);
return request;
}
@@ -45,10 +48,30 @@ public abstract class RegisterNodeManage
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
+ /**
+ * We introduce this here because currently YARN RM doesn't persist nodes info
+ * for application running. When RM restart happened, we cannot determinate if
+ * a node should do application cleanup (like log-aggregation, status update,
+ * etc.) or not. <p/>
+ * When we have this running application list in node manager register
+ * request, we can recover nodes info for running applications. And then we
+ * can take actions accordingly
+ *
+ * @return running application list in this node
+ */
+ public abstract List<ApplicationId> getRunningApplications();
+
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
public abstract void setContainerStatuses(
List<NMContainerStatus> containerStatuses);
+
+ /**
+ * Setter for {@link RegisterNodeManagerRequest#getRunningApplications()}
+ * @param runningApplications running application in this node
+ */
+ public abstract void setRunningApplications(
+ List<ApplicationId> runningApplications);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Wed Jun 18 23:15:04 2014
@@ -20,12 +20,23 @@ package org.apache.hadoop.yarn.server.ap
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
@@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestP
private Resource resource = null;
private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null;
+ private List<ApplicationId> runningApplications = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -65,6 +77,9 @@ public class RegisterNodeManagerRequestP
if (this.containerStatuses != null) {
addNMContainerStatusesToProto();
}
+ if (this.runningApplications != null) {
+ addRunningApplicationsToProto();
+ }
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
@@ -158,6 +173,66 @@ public class RegisterNodeManagerRequestP
maybeInitBuilder();
builder.setHttpPort(httpPort);
}
+
+ @Override
+ public List<ApplicationId> getRunningApplications() {
+ initRunningApplications();
+ return runningApplications;
+ }
+
+ private void initRunningApplications() {
+ if (this.runningApplications != null) {
+ return;
+ }
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ApplicationIdProto> list = p.getRunningApplicationsList();
+ this.runningApplications = new ArrayList<ApplicationId>();
+ for (ApplicationIdProto c : list) {
+ this.runningApplications.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setRunningApplications(List<ApplicationId> apps) {
+ if (apps == null) {
+ return;
+ }
+ initRunningApplications();
+ this.runningApplications.addAll(apps);
+ }
+
+ private void addRunningApplicationsToProto() {
+ maybeInitBuilder();
+ builder.clearRunningApplications();
+ if (runningApplications == null) {
+ return;
+ }
+ Iterable<ApplicationIdProto> it = new Iterable<ApplicationIdProto>() {
+
+ @Override
+ public Iterator<ApplicationIdProto> iterator() {
+ return new Iterator<ApplicationIdProto>() {
+ Iterator<ApplicationId> iter = runningApplications.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllRunningApplications(it);
+ }
@Override
public List<NMContainerStatus> getNMContainerStatuses() {
@@ -216,6 +291,14 @@ public class RegisterNodeManagerRequestP
maybeInitBuilder();
builder.setNmVersion(version);
}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl)t).getProto();
+ }
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
return new NodeIdPBImpl(p);
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Wed Jun 18 23:15:04 2014
@@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto
optional ResourceProto resource = 4;
optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6;
+ repeated ApplicationIdProto runningApplications = 7;
}
message RegisterNodeManagerResponseProto {
@@ -66,4 +67,4 @@ message NMContainerStatusProto {
optional PriorityProto priority = 4;
optional string diagnostics = 5 [default = "N/A"];
optional int32 container_exit_status = 6;
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java Wed Jun 18 23:15:04 2014
@@ -83,7 +83,8 @@ public class TestProtocolRecords {
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("1.1.1.1", 1000), 8080,
- Resource.newInstance(1024, 1), "NM-version-id", reports);
+ Resource.newInstance(1024, 1), "NM-version-id", reports,
+ Arrays.asList(appId));
RegisterNodeManagerRequest requestProto =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
@@ -95,5 +96,7 @@ public class TestProtocolRecords {
requestProto.getNodeId());
Assert.assertEquals(Resource.newInstance(1024, 1),
requestProto.getResource());
+ Assert.assertEquals(1, requestProto.getRunningApplications().size());
+ Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Wed Jun 18 23:15:04 2014
@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,6 +57,8 @@ public class DeletionService extends Abs
private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched;
private static final FileContext lfs = getLfs();
+ private final NMStateStoreService stateStore;
+ private AtomicInteger nextTaskId = new AtomicInteger(0);
static final FileContext getLfs() {
try {
@@ -60,14 +69,18 @@ public class DeletionService extends Abs
}
public DeletionService(ContainerExecutor exec) {
+ this(exec, new NMNullStateStoreService());
+ }
+
+ public DeletionService(ContainerExecutor exec,
+ NMStateStoreService stateStore) {
super(DeletionService.class.getName());
this.exec = exec;
this.debugDelay = 0;
+ this.stateStore = stateStore;
}
/**
- *
- /**
* Delete the path(s) as this user.
* @param user The user to delete as, or the JVM user if null
* @param subDir the sub directory name
@@ -76,19 +89,20 @@ public class DeletionService extends Abs
public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) {
- if (baseDirs == null || baseDirs.length == 0) {
- sched.schedule(new FileDeletionTask(this, user, subDir, null),
- debugDelay, TimeUnit.SECONDS);
- } else {
- sched.schedule(
- new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
- debugDelay, TimeUnit.SECONDS);
+ List<Path> baseDirList = null;
+ if (baseDirs != null && baseDirs.length != 0) {
+ baseDirList = Arrays.asList(baseDirs);
}
+ FileDeletionTask task =
+ new FileDeletionTask(this, user, subDir, baseDirList);
+ recordDeletionTaskInStateStore(task);
+ sched.schedule(task, debugDelay, TimeUnit.SECONDS);
}
}
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
if (debugDelay != -1) {
+ recordDeletionTaskInStateStore(fileDeletionTask);
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
}
}
@@ -109,6 +123,9 @@ public class DeletionService extends Abs
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
+ if (stateStore.canRecover()) {
+ recover(stateStore.loadDeletionServiceState());
+ }
super.serviceInit(conf);
}
@@ -139,6 +156,8 @@ public class DeletionService extends Abs
}
public static class FileDeletionTask implements Runnable {
+ public static final int INVALID_TASK_ID = -1;
+ private int taskId;
private final String user;
private final Path subDir;
private final List<Path> baseDirs;
@@ -152,6 +171,12 @@ public class DeletionService extends Abs
private FileDeletionTask(DeletionService delService, String user,
Path subDir, List<Path> baseDirs) {
+ this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
+ }
+
+ private FileDeletionTask(int taskId, DeletionService delService,
+ String user, Path subDir, List<Path> baseDirs) {
+ this.taskId = taskId;
this.delService = delService;
this.user = user;
this.subDir = subDir;
@@ -198,6 +223,12 @@ public class DeletionService extends Abs
return this.success;
}
+ public synchronized FileDeletionTask[] getSuccessorTasks() {
+ FileDeletionTask[] successors =
+ new FileDeletionTask[successorTaskSet.size()];
+ return successorTaskSet.toArray(successors);
+ }
+
@Override
public void run() {
if (LOG.isDebugEnabled()) {
@@ -286,6 +317,12 @@ public class DeletionService extends Abs
* dependent tasks of it has failed marking its success = false.
*/
private synchronized void fileDeletionTaskFinished() {
+ try {
+ delService.stateStore.removeDeletionTask(taskId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove deletion task " + taskId
+ + " from state store", e);
+ }
Iterator<FileDeletionTask> successorTaskI =
this.successorTaskSet.iterator();
while (successorTaskI.hasNext()) {
@@ -318,4 +355,129 @@ public class DeletionService extends Abs
Path[] baseDirs) {
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
+
+ private void recover(RecoveredDeletionServiceState state)
+ throws IOException {
+ List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
+ Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
+ new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
+ Set<Integer> successorTasks = new HashSet<Integer>();
+ for (DeletionServiceDeleteTaskProto proto : taskProtos) {
+ DeletionTaskRecoveryInfo info = parseTaskProto(proto);
+ idToInfoMap.put(info.task.taskId, info);
+ nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
+ successorTasks.addAll(info.successorTaskIds);
+ }
+
+ // restore the task dependencies and schedule the deletion tasks that
+ // have no predecessors
+ final long now = System.currentTimeMillis();
+ for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
+ for (Integer successorId : info.successorTaskIds){
+ DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
+ if (successor != null) {
+ info.task.addFileDeletionTaskDependency(successor.task);
+ } else {
+ LOG.error("Unable to locate dependency task for deletion task "
+ + info.task.taskId + " at " + info.task.getSubDir());
+ }
+ }
+ if (!successorTasks.contains(info.task.taskId)) {
+ long msecTilDeletion = info.deletionTimestamp - now;
+ sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private DeletionTaskRecoveryInfo parseTaskProto(
+ DeletionServiceDeleteTaskProto proto) throws IOException {
+ int taskId = proto.getId();
+ String user = proto.hasUser() ? proto.getUser() : null;
+ Path subdir = null;
+ List<Path> basePaths = null;
+ if (proto.hasSubdir()) {
+ subdir = new Path(proto.getSubdir());
+ }
+ List<String> basedirs = proto.getBasedirsList();
+ if (basedirs != null && basedirs.size() > 0) {
+ basePaths = new ArrayList<Path>(basedirs.size());
+ for (String basedir : basedirs) {
+ basePaths.add(new Path(basedir));
+ }
+ }
+
+ FileDeletionTask task = new FileDeletionTask(taskId, this, user,
+ subdir, basePaths);
+ return new DeletionTaskRecoveryInfo(task,
+ proto.getSuccessorIdsList(),
+ proto.getDeletionTime());
+ }
+
+ private int generateTaskId() {
+ // get the next ID but avoid an invalid ID
+ int taskId = nextTaskId.incrementAndGet();
+ while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+ taskId = nextTaskId.incrementAndGet();
+ }
+ return taskId;
+ }
+
+ private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+ if (!stateStore.canRecover()) {
+ // optimize the case where we aren't really recording
+ return;
+ }
+ if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+ return; // task already recorded
+ }
+
+ task.taskId = generateTaskId();
+
+ FileDeletionTask[] successors = task.getSuccessorTasks();
+
+ // store successors first to ensure task IDs have been generated for them
+ for (FileDeletionTask successor : successors) {
+ recordDeletionTaskInStateStore(successor);
+ }
+
+ DeletionServiceDeleteTaskProto.Builder builder =
+ DeletionServiceDeleteTaskProto.newBuilder();
+ builder.setId(task.taskId);
+ if (task.getUser() != null) {
+ builder.setUser(task.getUser());
+ }
+ if (task.getSubDir() != null) {
+ builder.setSubdir(task.getSubDir().toString());
+ }
+ builder.setDeletionTime(System.currentTimeMillis() +
+ TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
+ if (task.getBaseDirs() != null) {
+ for (Path dir : task.getBaseDirs()) {
+ builder.addBasedirs(dir.toString());
+ }
+ }
+ for (FileDeletionTask successor : successors) {
+ builder.addSuccessorIds(successor.taskId);
+ }
+
+ try {
+ stateStore.storeDeletionTask(task.taskId, builder.build());
+ } catch (IOException e) {
+ LOG.error("Unable to store deletion task " + task.taskId + " for "
+ + task.getSubDir(), e);
+ }
+ }
+
+ private static class DeletionTaskRecoveryInfo {
+ FileDeletionTask task;
+ List<Integer> successorTaskIds;
+ long deletionTimestamp;
+
+ public DeletionTaskRecoveryInfo(FileDeletionTask task,
+ List<Integer> successorTaskIds, long deletionTimestamp) {
+ this.task = task;
+ this.successorTaskIds = successorTaskIds;
+ this.deletionTimestamp = deletionTimestamp;
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Jun 18 23:15:04 2014
@@ -114,7 +114,7 @@ public class NodeManager extends Composi
}
protected DeletionService createDeletionService(ContainerExecutor exec) {
- return new DeletionService(exec);
+ return new DeletionService(exec, nmStore);
}
protected NMContext createNMContext(
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Jun 18 23:15:04 2014
@@ -250,7 +250,7 @@ public class NodeStatusUpdaterImpl exten
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerReports);
+ nodeManagerVersionId, containerReports, getRunningApplications());
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
@@ -374,6 +374,12 @@ public class NodeStatusUpdaterImpl exten
}
return containerStatuses;
}
+
+ private List<ApplicationId> getRunningApplications() {
+ List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
+ runningApplications.addAll(this.context.getApplications().keySet());
+ return runningApplications;
+ }
// These NMContainerStatus are sent on NM registration and used by YARN only.
private List<NMContainerStatus> getNMContainerStatuses() {
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Wed Jun 18 23:15:04 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,6 +59,9 @@ public class NMLeveldbStateStoreService
private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
private static final String DB_SCHEMA_VERSION = "1.0";
+ private static final String DELETION_TASK_KEY_PREFIX =
+ "DeletionService/deltask_";
+
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
LOCALIZATION_KEY_PREFIX + "public/";
@@ -91,8 +95,9 @@ public class NMLeveldbStateStoreService
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
+ LeveldbIterator iter = null;
try {
- LeveldbIterator iter = new LeveldbIterator(db);
+ iter = new LeveldbIterator(db);
iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
state.publicTrackerState = loadResourceTrackerState(iter,
LOCALIZATION_PUBLIC_KEY_PREFIX);
@@ -118,6 +123,10 @@ public class NMLeveldbStateStoreService
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
}
return state;
@@ -309,6 +318,56 @@ public class NMLeveldbStateStoreService
@Override
+ public RecoveredDeletionServiceState loadDeletionServiceState()
+ throws IOException {
+ RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
+ state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
+ break;
+ }
+ state.tasks.add(
+ DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
+ }
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return state;
+ }
+
+ @Override
+ public void storeDeletionTask(int taskId,
+ DeletionServiceDeleteTaskProto taskProto) throws IOException {
+ String key = DELETION_TASK_KEY_PREFIX + taskId;
+ try {
+ db.put(bytes(key), taskProto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void removeDeletionTask(int taskId) throws IOException {
+ String key = DELETION_TASK_KEY_PREFIX + taskId;
+ try {
+ db.delete(bytes(key));
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+
+ @Override
protected void initStorage(Configuration conf)
throws IOException {
Path storeRoot = createStorageDir(conf);
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Wed Jun 18 23:15:04 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
// The state store to use when state isn't being stored
@@ -61,6 +62,22 @@ public class NMNullStateStoreService ext
}
@Override
+ public RecoveredDeletionServiceState loadDeletionServiceState()
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void storeDeletionTask(int taskId,
+ DeletionServiceDeleteTaskProto taskProto) throws IOException {
+ }
+
+ @Override
+ public void removeDeletionTask(int taskId) throws IOException {
+ }
+
+ @Override
protected void initStorage(Configuration conf) throws IOException {
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Wed Jun 18 23:15:04 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@Private
@@ -91,6 +92,14 @@ public abstract class NMStateStoreServic
}
}
+ public static class RecoveredDeletionServiceState {
+ List<DeletionServiceDeleteTaskProto> tasks;
+
+ public List<DeletionServiceDeleteTaskProto> getTasks() {
+ return tasks;
+ }
+ }
+
/** Initialize the state storage */
@Override
public void serviceInit(Configuration conf) throws IOException {
@@ -155,6 +164,15 @@ public abstract class NMStateStoreServic
ApplicationId appId, Path localPath) throws IOException;
+ public abstract RecoveredDeletionServiceState loadDeletionServiceState()
+ throws IOException;
+
+ public abstract void storeDeletionTask(int taskId,
+ DeletionServiceDeleteTaskProto taskProto) throws IOException;
+
+ public abstract void removeDeletionTask(int taskId) throws IOException;
+
+
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Wed Jun 18 23:15:04 2014
@@ -24,6 +24,15 @@ package hadoop.yarn;
import "yarn_protos.proto";
+message DeletionServiceDeleteTaskProto {
+ optional int32 id = 1;
+ optional string user = 2;
+ optional string subdir = 3;
+ optional int64 deletionTime = 4;
+ repeated string basedirs = 5;
+ repeated int32 successorIds = 6;
+}
+
message LocalizedResourceProto {
optional LocalResourceProto resource = 1;
optional string localPath = 2;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Wed Jun 18 23:15:04 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -285,4 +286,58 @@ public class TestDeletionService {
del.stop();
}
}
+
+ @Test
+ public void testRecovery() throws Exception {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ List<Path> baseDirs = buildDirs(r, base, 4);
+ createDirs(new Path("."), baseDirs);
+ List<Path> content = buildDirs(r, new Path("."), 10);
+ for (Path b : baseDirs) {
+ createDirs(b, content);
+ }
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 1);
+ NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ DeletionService del =
+ new DeletionService(new FakeDefaultContainerExecutor(), stateStore);
+ try {
+ del.init(conf);
+ del.start();
+ for (Path p : content) {
+ assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
+ del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+ p, baseDirs.toArray(new Path[4]));
+ }
+
+ // restart the deletion service
+ del.stop();
+ del = new DeletionService(new FakeDefaultContainerExecutor(),
+ stateStore);
+ del.init(conf);
+ del.start();
+
+ // verify paths are still eventually deleted
+ int msecToWait = 10 * 1000;
+ for (Path p : baseDirs) {
+ for (Path q : content) {
+ Path fp = new Path(p, q);
+ while (msecToWait > 0 && lfs.util().exists(fp)) {
+ Thread.sleep(100);
+ msecToWait -= 100;
+ }
+ assertFalse(lfs.util().exists(fp));
+ }
+ }
+ } finally {
+ del.close();
+ stateStore.close();
+ }
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Wed Jun 18 23:15:04 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -25,10 +27,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<TrackerKey, TrackerState> trackerStates;
+ private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
public NMMemoryStateStoreService() {
super(NMMemoryStateStoreService.class.getName());
@@ -110,6 +114,7 @@ public class NMMemoryStateStoreService e
@Override
protected void initStorage(Configuration conf) {
trackerStates = new HashMap<TrackerKey, TrackerState>();
+ deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
}
@Override
@@ -121,6 +126,28 @@ public class NMMemoryStateStoreService e
}
+ @Override
+ public RecoveredDeletionServiceState loadDeletionServiceState()
+ throws IOException {
+ RecoveredDeletionServiceState result =
+ new RecoveredDeletionServiceState();
+ result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
+ deleteTasks.values());
+ return result;
+ }
+
+ @Override
+ public synchronized void storeDeletionTask(int taskId,
+ DeletionServiceDeleteTaskProto taskProto) throws IOException {
+ deleteTasks.put(taskId, taskProto);
+ }
+
+ @Override
+ public synchronized void removeDeletionTask(int taskId) throws IOException {
+ deleteTasks.remove(taskId);
+ }
+
+
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Wed Jun 18 23:15:04 2014
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -404,4 +406,58 @@ public class TestNMLeveldbStateStoreServ
state.getUserResources();
assertTrue(userResources.isEmpty());
}
+
+ @Test
+ public void testDeletionTaskStorage() throws IOException {
+ // test empty when no state
+ RecoveredDeletionServiceState state =
+ stateStore.loadDeletionServiceState();
+ assertTrue(state.getTasks().isEmpty());
+
+ // store a deletion task and verify recovered
+ DeletionServiceDeleteTaskProto proto =
+ DeletionServiceDeleteTaskProto.newBuilder()
+ .setId(7)
+ .setUser("someuser")
+ .setSubdir("some/subdir")
+ .addBasedirs("some/dir/path")
+ .addBasedirs("some/other/dir/path")
+ .setDeletionTime(123456L)
+ .addSuccessorIds(8)
+ .addSuccessorIds(9)
+ .build();
+ stateStore.storeDeletionTask(proto.getId(), proto);
+ restartStateStore();
+ state = stateStore.loadDeletionServiceState();
+ assertEquals(1, state.getTasks().size());
+ assertEquals(proto, state.getTasks().get(0));
+
+ // store another deletion task
+ DeletionServiceDeleteTaskProto proto2 =
+ DeletionServiceDeleteTaskProto.newBuilder()
+ .setId(8)
+ .setUser("user2")
+ .setSubdir("subdir2")
+ .setDeletionTime(789L)
+ .build();
+ stateStore.storeDeletionTask(proto2.getId(), proto2);
+ restartStateStore();
+ state = stateStore.loadDeletionServiceState();
+ assertEquals(2, state.getTasks().size());
+ assertTrue(state.getTasks().contains(proto));
+ assertTrue(state.getTasks().contains(proto2));
+
+ // delete a task and verify gone after recovery
+ stateStore.removeDeletionTask(proto2.getId());
+ restartStateStore();
+ state = stateStore.loadDeletionServiceState();
+ assertEquals(1, state.getTasks().size());
+ assertEquals(proto, state.getTasks().get(0));
+
+ // delete the last task and verify none left
+ stateStore.removeDeletionTask(proto.getId());
+ restartStateStore();
+ state = stateStore.loadDeletionServiceState();
+ assertTrue(state.getTasks().isEmpty());
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Jun 18 23:15:04 2014
@@ -244,15 +244,6 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!rmContext.isWorkPreservingRecoveryEnabled()) {
- if (!request.getNMContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getNMContainerStatuses());
- for (NMContainerStatus status : request.getNMContainerStatuses()) {
- handleNMContainerStatus(status);
- }
- }
- }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -311,7 +302,8 @@ public class ResourceTrackerService exte
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
+ new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+ request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
@@ -322,6 +314,18 @@ public class ResourceTrackerService exte
// present for any running application.
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
+
+ // Handle received container status, this should be processed after new
+ // RMNode inserted
+ if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+ if (!request.getNMContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getNMContainerStatuses());
+ for (NMContainerStatus status : request.getNMContainerStatuses()) {
+ handleNMContainerStatus(status);
+ }
+ }
+ }
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Jun 18 23:15:04 2014
@@ -19,16 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
-
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -208,6 +208,14 @@ public interface RMApp extends EventHand
* @return the flag indicating whether the applications's state is stored.
*/
boolean isAppFinalStateStored();
+
+
+ /**
+ * Nodes on which the containers for this {@link RMApp} ran.
+ * @return the set of nodes that ran any containers from this {@link RMApp}
+ * Add more node on which containers for this {@link RMApp} ran
+ */
+ Set<NodeId> getRanNodes();
/**
* Create the external user-facing state of ApplicationMaster from the
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Wed Jun 18 23:15:04 2014
@@ -38,6 +38,9 @@ public enum RMAppEventType {
ATTEMPT_FAILED,
ATTEMPT_KILLED,
NODE_UPDATE,
+
+ // Source: Container and ResourceTracker
+ APP_RUNNING_ON_NODE,
// Source: RMStateStore
APP_NEW_SAVED,
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Jun 18 23:15:04 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp,
private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
+ private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
@@ -180,7 +181,6 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED))
-
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -200,6 +200,9 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at
@@ -220,6 +223,9 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
// UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.RUNNING,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
@@ -235,6 +241,9 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_FINISHED,
new AttemptFinishedAtFinalSavingTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -243,6 +252,9 @@ public class RMAppImpl implements RMApp,
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -251,6 +263,9 @@ public class RMAppImpl implements RMApp,
RMAppEventType.KILL))
// Transitions from KILLING state
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -267,6 +282,9 @@ public class RMAppImpl implements RMApp,
// Transitions from FINISHED state
// ignorable transitions
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
EnumSet.of(
RMAppEventType.NODE_UPDATE,
@@ -276,11 +294,17 @@ public class RMAppImpl implements RMApp,
// Transitions from FAILED state
// ignorable transitions
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
@@ -695,6 +719,23 @@ public class RMAppImpl implements RMApp,
nodeUpdateEvent.getNode());
};
}
+
+ private static final class AppRunningOnNodeTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
+
+ // if final state already stored, notify RMNode
+ if (isAppInFinalState(app)) {
+ app.handler.handle(
+ new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
+ .getApplicationId()));
+ return;
+ }
+
+ // otherwise, add it to ranNodes for further process
+ app.ranNodes.add(nodeAddedEvent.getNodeId());
+ };
+ }
/**
* Move an app to a new queue.
@@ -1037,17 +1078,8 @@ public class RMAppImpl implements RMApp,
this.finalState = finalState;
}
- private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
- Set<NodeId> nodes = new HashSet<NodeId>();
- for (RMAppAttempt attempt : app.attempts.values()) {
- nodes.addAll(attempt.getRanNodes());
- }
- return nodes;
- }
-
public void transition(RMAppImpl app, RMAppEvent event) {
- Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
- for (NodeId nodeId : nodes) {
+ for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
@@ -1148,4 +1180,9 @@ public class RMAppImpl implements RMApp,
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ return ranNodes;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Wed Jun 18 23:15:04 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
-import java.util.Set;
import javax.crypto.SecretKey;
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -115,12 +113,6 @@ public interface RMAppAttempt extends Ev
FinalApplicationStatus getFinalApplicationStatus();
/**
- * Nodes on which the containers for this {@link RMAppAttempt} ran.
- * @return the set of nodes that ran any containers from this {@link RMAppAttempt}
- */
- Set<NodeId> getRanNodes();
-
- /**
* Return a list of the last set of finished containers, resetting the
* finished containers to empty.
* @return the list of just finished containers, re setting the finished containers.
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Wed Jun 18 23:15:04 2014
@@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
UNREGISTERED,
// Source: Containers
- CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jun 18 23:15:04 2014
@@ -26,16 +26,13 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
+
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements
private final ApplicationSubmissionContext submissionContext;
private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null;
-
- //nodes on while this attempt's containers ran
- private Set<NodeId> ranNodes =
- new HashSet<NodeId>();
+
private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
@@ -219,10 +213,7 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
- .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
+
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING,
@@ -249,10 +240,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
- .addTransition(RMAppAttemptState.ALLOCATED,
- RMAppAttemptState.ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
@@ -297,10 +284,6 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ALLOCATED)
.addTransition(
- RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
- .addTransition(
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
@@ -337,7 +320,6 @@ public class RMAppAttemptImpl implements
// should be fixed to reject container allocate request at Final
// Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL))
@@ -620,11 +602,6 @@ public class RMAppAttemptImpl implements
}
@Override
- public Set<NodeId> getRanNodes() {
- return ranNodes;
- }
-
- @Override
public Container getMasterContainer() {
this.readLock.lock();
@@ -705,7 +682,6 @@ public class RMAppAttemptImpl implements
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers();
- this.ranNodes = attempt.getRanNodes();
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -1402,17 +1378,6 @@ public class RMAppAttemptImpl implements
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
- private static final class ContainerAcquiredTransition extends
- BaseTransition {
- @Override
- public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
- RMAppAttemptContainerAcquiredEvent acquiredEvent
- = (RMAppAttemptContainerAcquiredEvent) event;
- appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
- }
- }
-
private static final class ContainerFinishedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Jun 18 23:15:04 2014
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -365,9 +365,9 @@ public class RMContainerImpl implements
RMContainerEventType.FINISHED));
return RMContainerState.COMPLETED;
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
- // Tell the appAttempt
- container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
- container.getApplicationAttemptId(), container.getContainer()));
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
return RMContainerState.RUNNING;
} else {
// This can never happen.
@@ -408,9 +408,9 @@ public class RMContainerImpl implements
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainerId());
- // Tell the appAttempt
- container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
- container.getApplicationAttemptId(), container.getContainer()));
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
}
}