You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC
svn commit: r1602947 [2/5] - in
/hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,17 +30,18 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
-
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -53,10 +58,12 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -185,6 +192,9 @@ public class TestNodeManagerResync {
TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
final Container container =
TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
+ NMContainerStatus report =
+ createNMContainerStatus(2, ContainerState.COMPLETE);
+ when(container.getNMContainerStatus()).thenReturn(report);
NodeManager nm = new NodeManager() {
int registerCount = 0;
@@ -203,7 +213,7 @@ public class TestNodeManagerResync {
if (registerCount == 0) {
// first register, no containers info.
try {
- Assert.assertEquals(0, request.getContainerStatuses()
+ Assert.assertEquals(0, request.getNMContainerStatuses()
.size());
} catch (AssertionError error) {
error.printStackTrace();
@@ -214,8 +224,8 @@ public class TestNodeManagerResync {
testCompleteContainer.getContainerId(), container);
} else {
// second register contains the completed container info.
- List<ContainerStatus> statuses =
- request.getContainerStatuses();
+ List<NMContainerStatus> statuses =
+ request.getNMContainerStatuses();
try {
Assert.assertEquals(1, statuses.size());
Assert.assertEquals(testCompleteContainer.getContainerId(),
@@ -510,4 +520,16 @@ public class TestNodeManagerResync {
}
}
}}
+
+ public static NMContainerStatus createNMContainerStatus(int id,
+ ContainerState containerState) {
+ ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId containerId = ContainerId.newInstance(applicationAttemptId, id);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, containerState,
+ Resource.newInstance(1024, 1), "recover container", 0);
+ return containerReport;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Jun 16 18:13:57 2014
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.junit.Assert;
import org.apache.commons.logging.LogFactory;
@@ -68,7 +69,6 @@ import org.apache.hadoop.yarn.security.C
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
@@ -348,8 +348,7 @@ public class TestContainerManager extend
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
- int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
- ExitCode.TERMINATED.getExitCode();
+ int expectedExitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Assert that the process is not alive anymore
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Jun 16 18:13:57 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -319,7 +320,7 @@ public class TestContainer {
assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
- assertEquals(ExitCode.TERMINATED.getExitCode(),
+ assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
wc.c.cloneAndGetContainerStatus().getExitStatus());
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
.contains("KillRequest"));
@@ -339,7 +340,7 @@ public class TestContainer {
assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
- assertEquals(ExitCode.TERMINATED.getExitCode(),
+ assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
wc.c.cloneAndGetContainerStatus().getExitStatus());
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
.contains("KillRequest"));
@@ -898,12 +899,14 @@ public class TestContainer {
}
public void killContainer() {
- c.handle(new ContainerKillEvent(cId, "KillRequest"));
+ c.handle(new ContainerKillEvent(cId,
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ "KillRequest"));
drainDispatcherEvents();
}
public void containerKilledOnRequest() {
- int exitCode = ExitCode.FORCE_KILLED.getExitCode();
+ int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
String diagnosticMsg = "Container completed with exit code " + exitCode;
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -73,7 +74,6 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -604,8 +604,7 @@ public class TestContainerLaunch extends
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
- int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
- ExitCode.TERMINATED.getExitCode();
+ int expectedExitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Assert that the process is not alive anymore
@@ -717,7 +716,7 @@ public class TestContainerLaunch extends
ContainerStatus containerStatus =
containerManager.getContainerStatuses(gcsRequest)
.getContainerStatuses().get(0);
- Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
+ Assert.assertEquals(ContainerExitStatus.KILLED_BY_APPMASTER,
containerStatus.getExitStatus());
// Now verify the contents of the file. Script generates a message when it
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -60,7 +61,6 @@ import org.apache.hadoop.yarn.event.Asyn
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -270,7 +270,7 @@ public class TestContainersMonitor exten
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
- Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+ Assert.assertEquals(ContainerExitStatus.KILLED_EXCEEDED_VMEM,
containerStatus.getExitStatus());
String expectedMsgPattern =
"Container \\[pid=" + pid + ",containerID=" + cId
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Mon Jun 16 18:13:57 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public class MockContainer implements Co
public ContainerTokenIdentifier getContainerTokenIdentifier() {
return this.containerTokenIdentifier;
}
+
+ @Override
+ public NMContainerStatus getNMContainerStatus() {
+ return null;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Mon Jun 16 18:13:57 2014
@@ -99,4 +99,6 @@ public interface RMContext {
RMApplicationHistoryWriter rmApplicationHistoryWriter);
ConfigurationProvider getConfigurationProvider();
+
+ boolean isWorkPreservingRecoveryEnabled();
}
\ No newline at end of file
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Mon Jun 16 18:13:57 2014
@@ -60,6 +60,7 @@ public class RMContextImpl implements RM
= new ConcurrentHashMap<String, RMNode>();
private boolean isHAEnabled;
+ private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
@@ -329,6 +330,15 @@ public class RMContextImpl implements RM
}
}
+ public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+ this.isWorkPreservingRecoveryEnabled = enabled;
+ }
+
+ @Override
+ public boolean isWorkPreservingRecoveryEnabled() {
+ return this.isWorkPreservingRecoveryEnabled;
+ }
+
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Mon Jun 16 18:13:57 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Utility methods to aid serving RM data through the REST and RPC APIs
@@ -225,4 +228,13 @@ public class RMServerUtils {
}
}
+ /**
+ * Statically defined dummy ApplicationResourceUsageREport. Used as
+ * a return value when a valid report cannot be found.
+ */
+ public static final ApplicationResourceUsageReport
+ DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
+ BuilderUtils.newApplicationResourceUsageReport(-1, -1,
+ Resources.createResource(-1, -1), Resources.createResource(-1, -1),
+ Resources.createResource(-1, -1));
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Jun 16 18:13:57 2014
@@ -327,7 +327,7 @@ public class ResourceManager extends Com
* RMActiveServices handles all the Active services in the RM.
*/
@Private
- class RMActiveServices extends CompositeService {
+ public class RMActiveServices extends CompositeService {
private DelegationTokenRenewer delegationTokenRenewer;
private EventHandler<SchedulerEvent> schedulerDispatcher;
@@ -364,9 +364,15 @@ public class ResourceManager extends Com
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
- if(isRecoveryEnabled) {
+ if (isRecoveryEnabled) {
recoveryEnabled = true;
- rmStore = RMStateStoreFactory.getStore(conf);
+ rmStore = RMStateStoreFactory.getStore(conf);
+ boolean isWorkPreservingRecoveryEnabled =
+ conf.getBoolean(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+ rmContext
+ .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
@@ -401,6 +407,8 @@ public class ResourceManager extends Com
// Initialize the scheduler
scheduler = createScheduler();
+ scheduler.setRMContext(rmContext);
+ addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +437,6 @@ public class ResourceManager extends Com
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
- try {
- scheduler.reinitialize(conf, rmContext);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to initialize scheduler", ioe);
- }
-
// creating monitors that handle preemption
createPolicyMonitors();
@@ -524,11 +526,9 @@ public class ResourceManager extends Com
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
- policy.init(conf, rmContext.getDispatcher().getEventHandler(),
- (PreemptableResourceScheduler) scheduler);
// periodically check whether we need to take action to guarantee
// constraints
- SchedulingMonitor mon = new SchedulingMonitor(policy);
+ SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
} else {
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Jun 16 18:13:57 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -195,7 +196,7 @@ public class ResourceTrackerService exte
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
- void handleContainerStatus(ContainerStatus containerStatus) {
+ void handleNMContainerStatus(NMContainerStatus containerStatus) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
@@ -219,11 +220,14 @@ public class ResourceTrackerService exte
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
- && containerStatus.getState() == ContainerState.COMPLETE) {
+ && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(containerStatus.getContainerId(),
+ containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+ containerStatus.getContainerExitStatus());
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
- new RMAppAttemptContainerFinishedEvent(appAttemptId,
- containerStatus);
+ new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@@ -240,11 +244,13 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!request.getContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getContainerStatuses());
- for (ContainerStatus containerStatus : request.getContainerStatuses()) {
- handleContainerStatus(containerStatus);
+ if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+ if (!request.getNMContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getNMContainerStatuses());
+ for (NMContainerStatus status : request.getNMContainerStatuses()) {
+ handleNMContainerStatus(status);
+ }
}
}
RegisterNodeManagerResponse response = recordFactory
@@ -305,7 +311,7 @@ public class ResourceTrackerService exte
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+ new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Mon Jun 16 18:13:57 2014
@@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import com.google.common.annotations.VisibleForTesting;
@@ -34,18 +36,29 @@ public class SchedulingMonitor extends A
private Thread checkerThread;
private volatile boolean stopped;
private long monitorInterval;
+ private RMContext rmContext;
- public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+ public SchedulingMonitor(RMContext rmContext,
+ SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy;
- this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+ this.rmContext = rmContext;
}
public long getMonitorInterval() {
return monitorInterval;
}
+
+ @VisibleForTesting
+ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
+ return scheduleEditPolicy;
+ }
+ @SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
+ scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+ (PreemptableResourceScheduler) rmContext.getScheduler());
+ this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Mon Jun 16 18:13:57 2014
@@ -165,6 +165,11 @@ public class ProportionalCapacityPreempt
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
}
+
+ @VisibleForTesting
+ public ResourceCalculator getResourceCalculator() {
+ return rc;
+ }
@Override
public void editSchedule(){
@@ -203,7 +208,9 @@ public class ProportionalCapacityPreempt
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
- logToCSV(queues);
+ if (LOG.isDebugEnabled()) {
+ logToCSV(queues);
+ }
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
@@ -603,7 +610,7 @@ public class ProportionalCapacityPreempt
sb.append(", ");
tq.appendLogString(sb);
}
- LOG.info(sb.toString());
+ LOG.debug(sb.toString());
}
/**
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -314,7 +316,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath);
@@ -334,7 +336,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@@ -354,7 +356,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -375,7 +377,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import com.google.common.annotations.VisibleForTesting;
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends
@Override
public void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData)
+ ApplicationStateData appStateData)
throws Exception {
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
@@ -92,7 +92,7 @@ public class MemoryRMStateStore extends
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
@@ -112,7 +112,7 @@ public class MemoryRMStateStore extends
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
@@ -137,7 +137,7 @@ public class MemoryRMStateStore extends
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -25,9 +25,9 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable
public class NullRMStateStore extends RMStateStore {
@@ -54,13 +54,13 @@ public class NullRMStateStore extends RM
@Override
protected void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
// Do nothing
}
@Override
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// Do nothing
}
@@ -102,13 +102,13 @@ public class NullRMStateStore extends RM
@Override
protected void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
// Do nothing
}
@Override
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
}
@Override
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Jun 16 18:13:57 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
@Private
@Unstable
@@ -83,8 +87,163 @@ public abstract class RMStateStore exten
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+ private enum RMStateStoreState {
+ DEFAULT
+ };
+
+ private static final StateMachineFactory<RMStateStore,
+ RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent>
+ stateMachineFactory = new StateMachineFactory<RMStateStore,
+ RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent>(
+ RMStateStoreState.DEFAULT)
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.STORE_APP, new StoreAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+
+ private final StateMachine<RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent> stateMachine;
+
+ private static class StoreAppTransition
+ implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+ ApplicationStateData appStateData = ApplicationStateData
+ .newInstance(appState);
+ LOG.info("Storing info for app: " + appId);
+ try {
+ store.storeApplicationStateInternal(appId, appStateData);
+ store.notifyDoneStoringApplication(appId, null);
+ } catch (Exception e) {
+ LOG.error("Error storing app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class UpdateAppTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateUpdateAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+ ApplicationStateData appStateData = ApplicationStateData
+ .newInstance(appState);
+ LOG.info("Updating info for app: " + appId);
+ try {
+ store.updateApplicationStateInternal(appId, appStateData);
+ store.notifyDoneUpdatingApplication(appId, null);
+ } catch (Exception e) {
+ LOG.error("Error updating app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class RemoveAppTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreRemoveAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
+ .getAppState();
+ ApplicationId appId = appState.getAppId();
+ LOG.info("Removing info for app: " + appId);
+ try {
+ store.removeApplicationStateInternal(appState);
+ } catch (Exception e) {
+ LOG.error("Error removing app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class StoreAppAttemptTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationAttemptState attemptState =
+ ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+ try {
+ ApplicationAttemptStateData attemptStateData =
+ ApplicationAttemptStateData.newInstance(attemptState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+ }
+ store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
+ store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+ null);
+ } catch (Exception e) {
+ LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class UpdateAppAttemptTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationAttemptState attemptState =
+ ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+ try {
+ ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
+ .newInstance(attemptState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
+ }
+ store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
+ store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
+ null);
+ } catch (Exception e) {
+ LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
public RMStateStore() {
super(RMStateStore.class.getName());
+ stateMachine = stateMachineFactory.make(this);
}
/**
@@ -390,10 +549,10 @@ public abstract class RMStateStore exten
* application.
*/
protected abstract void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception;
+ ApplicationStateData appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception;
+ ApplicationStateData appStateData) throws Exception;
@SuppressWarnings("unchecked")
/**
@@ -428,11 +587,11 @@ public abstract class RMStateStore exten
*/
protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+ ApplicationAttemptStateData attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+ ApplicationAttemptStateData attemptStateData) throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
@@ -596,105 +755,10 @@ public abstract class RMStateStore exten
// Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)
- || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
- ApplicationState appState = null;
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- appState = ((RMStateStoreAppEvent) event).getAppState();
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
- appState = ((RMStateUpdateAppEvent) event).getAppState();
- }
-
- Exception storedException = null;
- ApplicationStateDataPBImpl appStateData =
- (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
- .newApplicationStateData(appState.getSubmitTime(),
- appState.getStartTime(), appState.getUser(),
- appState.getApplicationSubmissionContext(), appState.getState(),
- appState.getDiagnostics(), appState.getFinishTime());
-
- ApplicationId appId =
- appState.getApplicationSubmissionContext().getApplicationId();
-
- LOG.info("Storing info for app: " + appId);
- try {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- storeApplicationStateInternal(appId, appStateData);
- notifyDoneStoringApplication(appId, storedException);
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
- updateApplicationStateInternal(appId, appStateData);
- notifyDoneUpdatingApplication(appId, storedException);
- }
- } catch (Exception e) {
- LOG.error("Error storing/updating app: " + appId, e);
- notifyStoreOperationFailed(e);
- }
- } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
- || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
-
- ApplicationAttemptState attemptState = null;
- if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- attemptState =
- ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
- attemptState =
- ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
- }
-
- Exception storedException = null;
- Credentials credentials = attemptState.getAppAttemptCredentials();
- ByteBuffer appAttemptTokens = null;
- try {
- if (credentials != null) {
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
- ApplicationAttemptStateDataPBImpl attemptStateData =
- (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
- .newApplicationAttemptStateData(attemptState.getAttemptId(),
- attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(),
- attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
- }
- if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
- notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- } else {
- assert event.getType().equals(
- RMStateStoreEventType.UPDATE_APP_ATTEMPT);
- updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
- notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- }
- } catch (Exception e) {
- LOG.error(
- "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
- notifyStoreOperationFailed(e);
- }
- } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
- ApplicationState appState =
- ((RMStateStoreRemoveAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- LOG.info("Removing info for app: " + appId);
- try {
- removeApplicationStateInternal(appState);
- } catch (Exception e) {
- LOG.error("Error removing app: " + appId, e);
- notifyStoreOperationFailed(e);
- }
- } else {
- LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+ try {
+ this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -546,12 +548,12 @@ public class ZKRMStateStore extends RMSt
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
- LOG.info("Done Loading applications from ZK state store");
+ LOG.debug("Done Loading applications from ZK state store");
}
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
@@ -565,7 +567,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
@@ -579,7 +581,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
}
@@ -587,7 +589,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appDirPath = getNodePath(rmAppRoot,
appAttemptId.getApplicationId().toString());
@@ -605,7 +607,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
@@ -622,7 +624,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
}
@@ -671,7 +673,7 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
opList.add(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
doMultiWithRetries(opList);
}
@@ -688,7 +690,7 @@ public class ZKRMStateStore extends RMSt
// in case znode doesn't exist
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
- LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(
@@ -770,7 +772,7 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
@@ -823,7 +825,7 @@ public class ZKRMStateStore extends RMSt
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
- LOG.info("Session expired");
+ LOG.info("ZKRMStateStore Session expired");
createConnection();
break;
default:
@@ -998,13 +1000,13 @@ public class ZKRMStateStore extends RMSt
throw new StoreFencedException();
}
} catch (KeeperException ke) {
+ LOG.info("Exception while executing a ZK operation.", ke);
if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Waiting for zookeeper to be connected, retry no. + "
- + retry);
+ LOG.info("Retrying operation on ZK. Retry no. " + retry);
Thread.sleep(zkRetryInterval);
continue;
}
- LOG.debug("Error while doing ZK operation.", ke);
+ LOG.info("Maxed out ZK retries. Giving up!");
throw ke;
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Mon Jun 16 18:13:57 2014
@@ -18,31 +18,73 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
/*
* Contains the state data that needs to be persisted for an ApplicationAttempt
*/
@Public
@Unstable
-public interface ApplicationAttemptStateData {
-
+public abstract class ApplicationAttemptStateData {
+ public static ApplicationAttemptStateData newInstance(
+ ApplicationAttemptId attemptId, Container container,
+ ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus) {
+ ApplicationAttemptStateData attemptStateData =
+ Records.newRecord(ApplicationAttemptStateData.class);
+ attemptStateData.setAttemptId(attemptId);
+ attemptStateData.setMasterContainer(container);
+ attemptStateData.setAppAttemptTokens(attemptTokens);
+ attemptStateData.setState(finalState);
+ attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+ attemptStateData.setDiagnostics(diagnostics);
+ attemptStateData.setStartTime(startTime);
+ attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+ return attemptStateData;
+ }
+
+ public static ApplicationAttemptStateData newInstance(
+ ApplicationAttemptState attemptState) throws IOException {
+ Credentials credentials = attemptState.getAppAttemptCredentials();
+ ByteBuffer appAttemptTokens = null;
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+ return newInstance(attemptState.getAttemptId(),
+ attemptState.getMasterContainer(), appAttemptTokens,
+ attemptState.getStartTime(), attemptState.getState(),
+ attemptState.getFinalTrackingUrl(),
+ attemptState.getDiagnostics(),
+ attemptState.getFinalApplicationStatus());
+ }
+
+ public abstract ApplicationAttemptStateDataProto getProto();
+
/**
* The ApplicationAttemptId for the application attempt
* @return ApplicationAttemptId for the application attempt
*/
@Public
@Unstable
- public ApplicationAttemptId getAttemptId();
+ public abstract ApplicationAttemptId getAttemptId();
- public void setAttemptId(ApplicationAttemptId attemptId);
+ public abstract void setAttemptId(ApplicationAttemptId attemptId);
/*
* The master container running the application attempt
@@ -50,9 +92,9 @@ public interface ApplicationAttemptState
*/
@Public
@Unstable
- public Container getMasterContainer();
+ public abstract Container getMasterContainer();
- public void setMasterContainer(Container container);
+ public abstract void setMasterContainer(Container container);
/**
* The application attempt tokens that belong to this attempt
@@ -60,17 +102,17 @@ public interface ApplicationAttemptState
*/
@Public
@Unstable
- public ByteBuffer getAppAttemptTokens();
+ public abstract ByteBuffer getAppAttemptTokens();
- public void setAppAttemptTokens(ByteBuffer attemptTokens);
+ public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
/**
* Get the final state of the application attempt.
* @return the final state of the application attempt.
*/
- public RMAppAttemptState getState();
+ public abstract RMAppAttemptState getState();
- public void setState(RMAppAttemptState state);
+ public abstract void setState(RMAppAttemptState state);
/**
* Get the original not-proxied <em>final tracking url</em> for the
@@ -79,34 +121,34 @@ public interface ApplicationAttemptState
* @return the original not-proxied <em>final tracking url</em> for the
* application
*/
- public String getFinalTrackingUrl();
+ public abstract String getFinalTrackingUrl();
/**
* Set the final tracking Url of the AM.
* @param url
*/
- public void setFinalTrackingUrl(String url);
+ public abstract void setFinalTrackingUrl(String url);
/**
* Get the <em>diagnositic information</em> of the attempt
* @return <em>diagnositic information</em> of the attempt
*/
- public String getDiagnostics();
+ public abstract String getDiagnostics();
- public void setDiagnostics(String diagnostics);
+ public abstract void setDiagnostics(String diagnostics);
/**
* Get the <em>start time</em> of the application.
* @return <em>start time</em> of the application
*/
- public long getStartTime();
+ public abstract long getStartTime();
- public void setStartTime(long startTime);
+ public abstract void setStartTime(long startTime);
/**
* Get the <em>final finish status</em> of the application.
* @return <em>final finish status</em> of the application
*/
- public FinalApplicationStatus getFinalApplicationStatus();
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
- public void setFinalApplicationStatus(FinalApplicationStatus finishState);
+ public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Mon Jun 16 18:13:57 2014
@@ -24,7 +24,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Records;
/**
* Contains all the state data that needs to be stored persistently
@@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res
*/
@Public
@Unstable
-public interface ApplicationStateData {
-
+public abstract class ApplicationStateData {
+ public static ApplicationStateData newInstance(long submitTime,
+ long startTime, String user,
+ ApplicationSubmissionContext submissionContext,
+ RMAppState state, String diagnostics, long finishTime) {
+ ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
+ appState.setSubmitTime(submitTime);
+ appState.setStartTime(startTime);
+ appState.setUser(user);
+ appState.setApplicationSubmissionContext(submissionContext);
+ appState.setState(state);
+ appState.setDiagnostics(diagnostics);
+ appState.setFinishTime(finishTime);
+ return appState;
+ }
+
+ public static ApplicationStateData newInstance(
+ ApplicationState appState) {
+ return newInstance(appState.getSubmitTime(), appState.getStartTime(),
+ appState.getUser(), appState.getApplicationSubmissionContext(),
+ appState.getState(), appState.getDiagnostics(),
+ appState.getFinishTime());
+ }
+
+ public abstract ApplicationStateDataProto getProto();
+
/**
* The time at which the application was received by the Resource Manager
* @return submitTime
*/
@Public
@Unstable
- public long getSubmitTime();
+ public abstract long getSubmitTime();
@Public
@Unstable
- public void setSubmitTime(long submitTime);
+ public abstract void setSubmitTime(long submitTime);
/**
* Get the <em>start time</em> of the application.
@@ -63,11 +90,11 @@ public interface ApplicationStateData {
*/
@Public
@Unstable
- public void setUser(String user);
+ public abstract void setUser(String user);
@Public
@Unstable
- public String getUser();
+ public abstract String getUser();
/**
* The {@link ApplicationSubmissionContext} for the application
@@ -76,34 +103,34 @@ public interface ApplicationStateData {
*/
@Public
@Unstable
- public ApplicationSubmissionContext getApplicationSubmissionContext();
+ public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
@Public
@Unstable
- public void setApplicationSubmissionContext(
+ public abstract void setApplicationSubmissionContext(
ApplicationSubmissionContext context);
/**
* Get the final state of the application.
* @return the final state of the application.
*/
- public RMAppState getState();
+ public abstract RMAppState getState();
- public void setState(RMAppState state);
+ public abstract void setState(RMAppState state);
/**
* Get the diagnostics information for the application master.
* @return the diagnostics information for the application master.
*/
- public String getDiagnostics();
+ public abstract String getDiagnostics();
- public void setDiagnostics(String diagnostics);
+ public abstract void setDiagnostics(String diagnostics);
/**
* The finish time of the application.
* @return the finish time of the application.,
*/
- public long getFinishTime();
+ public abstract long getFinishTime();
- public void setFinishTime(long finishTime);
+ public abstract void setFinishTime(long finishTime);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Mon Jun 16 18:13:57 2014
@@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-public class ApplicationAttemptStateDataPBImpl
-extends ProtoBase<ApplicationAttemptStateDataProto>
-implements ApplicationAttemptStateData {
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
+public class ApplicationAttemptStateDataPBImpl extends
+ ApplicationAttemptStateData {
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
this.proto = proto;
viaProto = true;
}
-
+
+ @Override
public ApplicationAttemptStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
}
if(this.appAttemptTokens != null) {
- builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
+ builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
+ this.appAttemptTokens));
}
}
@@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
if(!p.hasAppAttemptTokens()) {
return null;
}
- this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
+ this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
+ p.getAppAttemptTokens());
return appAttemptTokens;
}
@@ -249,24 +247,26 @@ implements ApplicationAttemptStateData {
builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
}
- public static ApplicationAttemptStateData newApplicationAttemptStateData(
- ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
- String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus) {
- ApplicationAttemptStateData attemptStateData =
- recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
- attemptStateData.setAttemptId(attemptId);
- attemptStateData.setMasterContainer(container);
- attemptStateData.setAppAttemptTokens(attemptTokens);
- attemptStateData.setState(finalState);
- attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
- attemptStateData.setDiagnostics(diagnostics);
- attemptStateData.setStartTime(startTime);
- attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
- return attemptStateData;
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
}
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());