You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/13 17:58:42 UTC
svn commit: r1492721 [1/3] - in
/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/di...
Author: vinodkv
Date: Thu Jun 13 15:58:38 2013
New Revision: 1492721
URL: http://svn.apache.org/r1492721
Log:
YARN-530. Defined Service model strictly, implemented AbstractService for robust subclassing and migrated yarn-common services. Contributed by Steve Loughran.
YARN-117. Migrated rest of YARN to the new service model. Contributed by Steve Louhran.
MAPREDUCE-5298. Moved MapReduce services to YARN-530 stricter lifecycle. Contributed by Steve Loughran.
svn merge --ignore-ancestry -c 1492718 ../../trunk/
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/log4j.properties
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/log4j.properties
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LifecycleEvent.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LifecycleEvent.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java
- copied unchanged from r1492718, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java
Removed:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientTokens.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Thu Jun 13 15:58:38 2013
@@ -88,6 +88,10 @@ Release 2.1.0-beta - UNRELEASED
YARN-642. Removed health parameter from ResourceManager /nodes web-service
and cleaned the behaviour of the status parameter. (Sandy Ryza vid vinodkv)
+ YARN-530. Defined Service model strictly, implemented AbstractService for
+ robust subclassing and migrated yarn-common services. (Steve Loughran via
+ vinodkv)
+
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
@@ -458,6 +462,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-700. TestInfoBlock fails on Windows because of line ending missmatch.
(Ivan Mitic via cnauroth)
+ YARN-117. Migrated rest of YARN to the new service model. (Steve Louhran via
+ vinodkv)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Thu Jun 13 15:58:38 2013
@@ -24,6 +24,11 @@
<Class name="~org\.apache\.hadoop\.yarn\.ipc\.RpcProtos.*" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.service.AbstractService" />
+ <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
+ </Match>
+
<!-- Ignore unchecked Event casts -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl" />
@@ -172,6 +177,11 @@
<Field name="renewalTimer" />
<Bug code="IS"/>
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.client.NMClientImpl$NMCommunicator"/>
+ <Field name="containerManager" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ </Match>
<!-- Don't care if putIfAbsent value is ignored -->
<Match>
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Thu Jun 13 15:58:38 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.applications.distributedshell;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -63,9 +64,16 @@ public class TestDistributedShell {
if (url == null) {
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
}
- yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
+ Configuration yarnClusterConfig = yarnCluster.getConfig();
+ yarnClusterConfig.set("yarn.application.classpath", new File(url.getPath()).getParent());
+ //write the document to a buffer (not directly to the file, as that
+ //can cause the file being written to get read -which will then fail.
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ yarnClusterConfig.writeXml(bytesOut);
+ bytesOut.close();
+ //write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(new File(url.getPath()));
- yarnCluster.getConfig().writeXml(os);
+ os.write(bytesOut.toByteArray());
os.close();
}
try {
@@ -78,8 +86,11 @@ public class TestDistributedShell {
@AfterClass
public static void tearDown() throws IOException {
if (yarnCluster != null) {
- yarnCluster.stop();
- yarnCluster = null;
+ try {
+ yarnCluster.stop();
+ } finally {
+ yarnCluster = null;
+ }
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Thu Jun 13 15:58:38 2013
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -54,14 +56,30 @@ public class TestUnmanagedAMLauncher {
TestUnmanagedAMLauncher.class.getSimpleName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
+ //get the address
+ Configuration yarnClusterConfig = yarnCluster.getConfig();
+ LOG.info("MiniYARN ResourceManager published address: " +
+ yarnClusterConfig.get(YarnConfiguration.RM_ADDRESS));
+ LOG.info("MiniYARN ResourceManager published web address: " +
+ yarnClusterConfig.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+ String webapp = yarnClusterConfig.get(YarnConfiguration.RM_WEBAPP_ADDRESS);
+ assertTrue("Web app address still unbound to a host at " + webapp,
+ !webapp.startsWith("0.0.0.0"));
+ LOG.info("Yarn webapp is at "+ webapp);
URL url = Thread.currentThread().getContextClassLoader()
.getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException(
"Could not find 'yarn-site.xml' dummy file in classpath");
}
+ //write the document to a buffer (not directly to the file, as that
+ //can cause the file being written to get read -which will then fail.
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ yarnClusterConfig.writeXml(bytesOut);
+ bytesOut.close();
+ //write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(new File(url.getPath()));
- yarnCluster.getConfig().writeXml(os);
+ os.write(bytesOut.toByteArray());
os.close();
}
try {
@@ -74,8 +92,11 @@ public class TestUnmanagedAMLauncher {
@AfterClass
public static void tearDown() throws IOException {
if (yarnCluster != null) {
- yarnCluster.stop();
- yarnCluster = null;
+ try {
+ yarnCluster.stop();
+ } finally {
+ yarnCluster = null;
+ }
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Thu Jun 13 15:58:38 2013
@@ -132,16 +132,16 @@ public class AMRMClientAsync<T extends C
}
@Override
- public void init(Configuration conf) {
- super.init(conf);
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
client.init(conf);
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
handlerThread.start();
client.start();
- super.start();
+ super.serviceStart();
}
/**
@@ -150,7 +150,7 @@ public class AMRMClientAsync<T extends C
* deadlock, and thus should be avoided.
*/
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (Thread.currentThread() == handlerThread) {
throw new YarnRuntimeException("Cannot call stop from callback handler thread!");
}
@@ -167,7 +167,7 @@ public class AMRMClientAsync<T extends C
} catch (InterruptedException ex) {
LOG.error("Error joining with hander thread", ex);
}
- super.stop();
+ super.serviceStop();
}
public void setHeartbeatInterval(int interval) {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Thu Jun 13 15:58:38 2013
@@ -151,12 +151,12 @@ public class AMRMClientImpl<T extends Co
}
@Override
- public synchronized void init(Configuration conf) {
- super.init(conf);
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
final YarnRPC rpc = YarnRPC.create(conf);
final InetSocketAddress rmAddress = conf.getSocketAddr(
@@ -180,15 +180,15 @@ public class AMRMClientImpl<T extends Co
}
});
LOG.debug("Connecting to ResourceManager at " + rmAddress);
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java Thu Jun 13 15:58:38 2013
@@ -166,18 +166,18 @@ public class NMClientAsync extends Abstr
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.maxThreadPoolSize = conf.getInt(
YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
client.init(conf);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
@@ -243,31 +243,39 @@ public class NMClientAsync extends Abstr
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
- eventDispatcherThread.interrupt();
- try {
- eventDispatcherThread.join();
- } catch (InterruptedException e) {
- LOG.error("The thread of " + eventDispatcherThread.getName() +
- " didn't finish normally.", e);
+ if (eventDispatcherThread != null) {
+ eventDispatcherThread.interrupt();
+ try {
+ eventDispatcherThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("The thread of " + eventDispatcherThread.getName() +
+ " didn't finish normally.", e);
+ }
}
- threadPool.shutdownNow();
- // If NMClientImpl doesn't stop running containers, the states doesn't
- // need to be cleared.
- if (!(client instanceof NMClientImpl) ||
- ((NMClientImpl) client).cleanupRunningContainers.get()) {
- containers.clear();
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ if (client != null) {
+ // If NMClientImpl doesn't stop running containers, the states doesn't
+ // need to be cleared.
+ if (!(client instanceof NMClientImpl) ||
+ ((NMClientImpl) client).cleanupRunningContainers.get()) {
+ if (containers != null) {
+ containers.clear();
+ }
+ }
+ client.stop();
}
- client.stop();
- super.stop();
+ super.serviceStop();
}
public void startContainer(
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java Thu Jun 13 15:58:38 2013
@@ -86,7 +86,7 @@ public class NMClientImpl extends Abstra
new ConcurrentHashMap<ContainerId, StartedContainer>();
//enabled by default
- protected AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
public NMClientImpl() {
super(NMClientImpl.class.getName());
@@ -97,13 +97,13 @@ public class NMClientImpl extends Abstra
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
// Usually, started-containers are stopped when this client stops. Unless
// the flag cleanupRunningContainers is set to false.
if (cleanupRunningContainers.get()) {
cleanupRunningContainers();
}
- super.stop();
+ super.serviceStop();
}
protected synchronized void cleanupRunningContainers() {
@@ -171,7 +171,7 @@ public class NMClientImpl extends Abstra
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
final YarnRPC rpc = YarnRPC.create(getConfig());
final InetSocketAddress containerAddress =
@@ -195,10 +195,11 @@ public class NMClientImpl extends Abstra
});
LOG.debug("Connecting to ContainerManager at " + containerAddress);
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.containerManager != null) {
RPC.stopProxy(this.containerManager);
@@ -209,6 +210,7 @@ public class NMClientImpl extends Abstra
containerAddress);
}
}
+ super.serviceStop();
}
public synchronized Map<String, ByteBuffer> startContainer(
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java Thu Jun 13 15:58:38 2013
@@ -79,7 +79,11 @@ public class YarnClientImpl extends Abst
}
public YarnClientImpl(InetSocketAddress rmAddress) {
- super(YarnClientImpl.class.getName());
+ this(YarnClientImpl.class.getName(), rmAddress);
+ }
+
+ public YarnClientImpl(String name, InetSocketAddress rmAddress) {
+ super(name);
this.rmAddress = rmAddress;
}
@@ -89,18 +93,18 @@ public class YarnClientImpl extends Abst
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
statePollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(getConfig());
this.rmClient = (ClientRMProtocol) rpc.getProxy(
@@ -108,15 +112,15 @@ public class YarnClientImpl extends Abst
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to ResourceManager at " + rmAddress);
}
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java Thu Jun 13 15:58:38 2013
@@ -153,24 +153,37 @@ public class TestNMClient {
@After
public void tearDown() {
rmClient.stop();
+ yarnClient.stop();
+ yarnCluster.stop();
+ }
+ private void stopNmClient(boolean stopContainers) {
+ assertNotNull("Null nmClient", nmClient);
// leave one unclosed
assertEquals(1, nmClient.startedContainers.size());
// default true
assertTrue(nmClient.cleanupRunningContainers.get());
- // don't stop the running containers
- nmClient.cleanupRunningContainersOnStop(false);
- assertFalse(nmClient.cleanupRunningContainers.get());
- nmClient.stop();
- assertTrue(nmClient.startedContainers.size() > 0);
- // stop the running containers
- nmClient.cleanupRunningContainersOnStop(true);
- assertTrue(nmClient.cleanupRunningContainers.get());
+ nmClient.cleanupRunningContainersOnStop(stopContainers);
+ assertEquals(stopContainers, nmClient.cleanupRunningContainers.get());
nmClient.stop();
- assertEquals(0, nmClient.startedContainers.size());
+ }
- yarnClient.stop();
- yarnCluster.stop();
+ @Test (timeout = 60000)
+ public void testNMClientNoCleanupOnStop()
+ throws YarnException, IOException {
+
+ rmClient.registerApplicationMaster("Host", 10000, "");
+
+ testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ // don't stop the running containers
+ stopNmClient(false);
+ assertFalse(nmClient.startedContainers. isEmpty());
+ //now cleanup
+ nmClient.cleanupRunningContainers();
+ assertEquals(0, nmClient.startedContainers.size());
}
@Test (timeout = 60000)
@@ -183,6 +196,11 @@ public class TestNMClient {
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
+ // stop the running containers on close
+ assertFalse(nmClient.startedContainers.isEmpty());
+ nmClient.cleanupRunningContainersOnStop(true);
+ assertTrue(nmClient.cleanupRunningContainers.get());
+ nmClient.stop();
}
private Set<Container> allocateContainers(
@@ -250,9 +268,12 @@ public class TestNMClient {
container.getContainerToken());
fail("Exception is expected");
} catch (YarnException e) {
- assertTrue("The thrown exception is not expected",
- e.getMessage().contains(
- "is either not started yet or already stopped"));
+ if (!e.getMessage()
+ .contains("is either not started yet or already stopped")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e).initCause(
+ e));
+ }
}
Credentials ts = new Credentials();
@@ -266,7 +287,8 @@ public class TestNMClient {
try {
nmClient.startContainer(container, clc);
} catch (YarnException e) {
- fail("Exception is not expected");
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e).initCause(e));
}
// leave one container unclosed
@@ -279,7 +301,9 @@ public class TestNMClient {
nmClient.stopContainer(container.getId(), container.getNodeId(),
container.getContainerToken());
} catch (YarnException e) {
- fail("Exception is not expected");
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
}
// getContainerStatus can be called after stopContainer
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java Thu Jun 13 15:58:38 2013
@@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.service.ServiceOperations;
+import org.junit.After;
import org.junit.Test;
@@ -64,6 +66,11 @@ public class TestNMClientAsync {
private NodeId nodeId;
private Token containerToken;
+ @After
+ public void teardown() {
+ ServiceOperations.stop(asyncClient);
+ }
+
@Test (timeout = 30000)
public void testNMClientAsync() throws Exception {
Configuration conf = new Configuration();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Thu Jun 13 15:58:38 2013
@@ -82,24 +82,24 @@ public class AsyncDispatcher extends Abs
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
//start all the components
- super.start();
+ super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
@@ -111,7 +111,7 @@ public class AsyncDispatcher extends Abs
}
// stop all the components
- super.stop();
+ super.serviceStop();
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java Thu Jun 13 15:58:38 2013
@@ -125,8 +125,9 @@ public class AggregatedLogDeletionServic
public AggregatedLogDeletionService() {
super(AggregatedLogDeletionService.class.getName());
}
-
- public void start() {
+
+ @Override
+ protected void serviceStart() throws Exception {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@@ -150,14 +151,14 @@ public class AggregatedLogDeletionServic
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if(timer != null) {
timer.cancel();
}
- super.stop();
+ super.serviceStop();
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java Thu Jun 13 15:58:38 2013
@@ -18,26 +18,33 @@
package org.apache.hadoop.yarn.service;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+/**
+ * This is the base implementation class for YARN services.
+ */
public abstract class AbstractService implements Service {
private static final Log LOG = LogFactory.getLog(AbstractService.class);
/**
- * Service state: initially {@link STATE#NOTINITED}.
- */
- private STATE state = STATE.NOTINITED;
-
- /**
* Service name.
*/
private final String name;
+
+ /** service state */
+ private final ServiceStateModel stateModel;
+
/**
* Service start time. Will be zero until the service is started.
*/
@@ -46,83 +53,341 @@ public abstract class AbstractService im
/**
* The configuration. Will be null until the service is initialized.
*/
- private Configuration config;
+ private volatile Configuration config;
/**
* List of state change listeners; it is final to ensure
* that it will never be null.
*/
- private List<ServiceStateChangeListener> listeners =
- new ArrayList<ServiceStateChangeListener>();
+ private final ServiceOperations.ServiceListeners listeners
+ = new ServiceOperations.ServiceListeners();
+ /**
+ * Static listeners to all events across all services
+ */
+ private static ServiceOperations.ServiceListeners globalListeners
+ = new ServiceOperations.ServiceListeners();
/**
+ * The cause of any failure -will be null.
+ * if a service did not stop due to a failure.
+ */
+ private Exception failureCause;
+
+ /**
+ * the state in which the service was when it failed.
+ * Only valid when the service is stopped due to a failure
+ */
+ private STATE failureState = null;
+
+ /**
+ * object used to co-ordinate {@link #waitForServiceToStop(long)}
+ * across threads.
+ */
+ private final AtomicBoolean terminationNotification =
+ new AtomicBoolean(false);
+
+ /**
+ * History of lifecycle transitions
+ */
+ private final List<LifecycleEvent> lifecycleHistory
+ = new ArrayList<LifecycleEvent>(5);
+
+ /**
+ * Map of blocking dependencies
+ */
+ private final Map<String,String> blockerMap = new HashMap<String, String>();
+
+ private final Object stateChangeLock = new Object();
+
+ /**
* Construct the service.
* @param name service name
*/
public AbstractService(String name) {
this.name = name;
+ stateModel = new ServiceStateModel(name);
}
@Override
- public synchronized STATE getServiceState() {
- return state;
+ public final STATE getServiceState() {
+ return stateModel.getState();
+ }
+
+ @Override
+ public final synchronized Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ @Override
+ public synchronized STATE getFailureState() {
+ return failureState;
+ }
+
+ /**
+ * Set the configuration for this service.
+ * This method is called during {@link #init(Configuration)}
+ * and should only be needed if for some reason a service implementation
+ * needs to override that initial setting -for example replacing
+ * it with a new subclass of {@link Configuration}
+ * @param conf new configuration.
+ */
+ protected void setConfig(Configuration conf) {
+ this.config = conf;
}
/**
* {@inheritDoc}
- * @throws IllegalStateException if the current service state does not permit
- * this action
+ * This invokes {@link #serviceInit}
+ * @param conf the configuration of the service. This must not be null
+ * @throws ServiceStateException if the configuration was null,
+ * the state change not permitted, or something else went wrong
*/
@Override
- public synchronized void init(Configuration conf) {
- ensureCurrentState(STATE.NOTINITED);
- this.config = conf;
- changeState(STATE.INITED);
- LOG.info("Service:" + getName() + " is inited.");
+ public void init(Configuration conf) {
+ if (conf == null) {
+ throw new ServiceStateException("Cannot initialize service "
+ + getName() + ": null configuration");
+ }
+ if (isInState(STATE.INITED)) {
+ return;
+ }
+ synchronized (stateChangeLock) {
+ if (enterState(STATE.INITED) != STATE.INITED) {
+ setConfig(conf);
+ try {
+ serviceInit(config);
+ if (isInState(STATE.INITED)) {
+ //if the service ended up here during init,
+ //notify the listeners
+ notifyListeners();
+ }
+ } catch (Exception e) {
+ noteFailure(e);
+ ServiceOperations.stopQuietly(LOG, this);
+ throw ServiceStateException.convert(e);
+ }
+ }
+ }
}
/**
* {@inheritDoc}
- * @throws IllegalStateException if the current service state does not permit
+ * @throws ServiceStateException if the current service state does not permit
* this action
*/
@Override
- public synchronized void start() {
- startTime = System.currentTimeMillis();
- ensureCurrentState(STATE.INITED);
- changeState(STATE.STARTED);
- LOG.info("Service:" + getName() + " is started.");
+ public void start() {
+ if (isInState(STATE.STARTED)) {
+ return;
+ }
+ //enter the started state
+ synchronized (stateChangeLock) {
+ if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
+ try {
+ startTime = System.currentTimeMillis();
+ serviceStart();
+ if (isInState(STATE.STARTED)) {
+ //if the service started (and isn't now in a later state), notify
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Service " + getName() + " is started");
+ }
+ notifyListeners();
+ }
+ } catch (Exception e) {
+ noteFailure(e);
+ ServiceOperations.stopQuietly(LOG, this);
+ throw ServiceStateException.convert(e);
+ }
+ }
+ }
}
/**
* {@inheritDoc}
- * @throws IllegalStateException if the current service state does not permit
- * this action
*/
@Override
- public synchronized void stop() {
- if (state == STATE.STOPPED ||
- state == STATE.INITED ||
- state == STATE.NOTINITED) {
- // already stopped, or else it was never
- // started (eg another service failing canceled startup)
+ public void stop() {
+ if (isInState(STATE.STOPPED)) {
return;
}
- ensureCurrentState(STATE.STARTED);
- changeState(STATE.STOPPED);
- LOG.info("Service:" + getName() + " is stopped.");
+ synchronized (stateChangeLock) {
+ if (enterState(STATE.STOPPED) != STATE.STOPPED) {
+ try {
+ serviceStop();
+ } catch (Exception e) {
+ //stop-time exceptions are logged if they are the first one,
+ noteFailure(e);
+ throw ServiceStateException.convert(e);
+ } finally {
+ //report that the service has terminated
+ terminationNotification.set(true);
+ synchronized (terminationNotification) {
+ terminationNotification.notifyAll();
+ }
+ //notify anything listening for events
+ notifyListeners();
+ }
+ } else {
+ //already stopped: note it
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring re-entrant call to stop()");
+ }
+ }
+ }
+ }
+
+ /**
+ * Relay to {@link #stop()}
+ * @throws IOException
+ */
+ @Override
+ public final void close() throws IOException {
+ stop();
+ }
+
+ /**
+ * Failure handling: record the exception
+ * that triggered it -if there was not one already.
+ * Services are free to call this themselves.
+ * @param exception the exception
+ */
+ protected final void noteFailure(Exception exception) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("noteFailure " + exception, null);
+ }
+ if (exception == null) {
+ //make sure failure logic doesn't itself cause problems
+ return;
+ }
+ //record the failure details, and log it
+ synchronized (this) {
+ if (failureCause == null) {
+ failureCause = exception;
+ failureState = getServiceState();
+ LOG.info("Service " + getName()
+ + " failed in state " + failureState
+ + "; cause: " + exception,
+ exception);
+ }
+ }
}
@Override
- public synchronized void register(ServiceStateChangeListener l) {
+ public final boolean waitForServiceToStop(long timeout) {
+ boolean completed = terminationNotification.get();
+ while (!completed) {
+ try {
+ synchronized(terminationNotification) {
+ terminationNotification.wait(timeout);
+ }
+ // here there has been a timeout, the object has terminated,
+ // or there has been a spurious wakeup (which we ignore)
+ completed = true;
+ } catch (InterruptedException e) {
+ // interrupted; have another look at the flag
+ completed = terminationNotification.get();
+ }
+ }
+ return terminationNotification.get();
+ }
+
+ /* ===================================================================== */
+ /* Override Points */
+ /* ===================================================================== */
+
+ /**
+ * All initialization code needed by a service.
+ *
+ * This method will only ever be called once during the lifecycle of
+ * a specific service instance.
+ *
+ * Implementations do not need to be synchronized as the logic
+ * in {@link #init(Configuration)} prevents re-entrancy.
+ *
+ * The base implementation checks to see if the subclass has created
+ * a new configuration instance, and if so, updates the base class value
+ * @param conf configuration
+ * @throws Exception on a failure -these will be caught,
+ * possibly wrapped, and wil; trigger a service stop
+ */
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (conf != config) {
+ LOG.debug("Config has been overridden during init");
+ setConfig(conf);
+ }
+ }
+
+ /**
+ * Actions called during the INITED to STARTED transition.
+ *
+ * This method will only ever be called once during the lifecycle of
+ * a specific service instance.
+ *
+ * Implementations do not need to be synchronized as the logic
+ * in {@link #start()} prevents re-entrancy.
+ *
+ * @throws Exception if needed -these will be caught,
+ * wrapped, and trigger a service stop
+ */
+ protected void serviceStart() throws Exception {
+
+ }
+
+ /**
+ * Actions called during the transition to the STOPPED state.
+ *
+ * This method will only ever be called once during the lifecycle of
+ * a specific service instance.
+ *
+ * Implementations do not need to be synchronized as the logic
+ * in {@link #stop()} prevents re-entrancy.
+ *
+ * Implementations MUST write this to be robust against failures, including
+ * checks for null references -and for the first failure to not stop other
+ * attempts to shut down parts of the service.
+ *
+ * @throws Exception if needed -these will be caught and logged.
+ */
+ protected void serviceStop() throws Exception {
+
+ }
+
+ @Override
+ public void register(ServiceStateChangeListener l) {
listeners.add(l);
}
@Override
- public synchronized void unregister(ServiceStateChangeListener l) {
+ public void unregister(ServiceStateChangeListener l) {
listeners.remove(l);
}
+ /**
+ * Register a global listener, which receives notifications
+ * from the state change events of all services in the JVM
+ * @param l listener
+ */
+ public static void registerGlobalListener(ServiceStateChangeListener l) {
+ globalListeners.add(l);
+ }
+
+ /**
+ * unregister a global listener.
+ * @param l listener to unregister
+ * @return true if the listener was found (and then deleted)
+ */
+ public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
+ return globalListeners.remove(l);
+ }
+
+ /**
+ * Package-scoped method for testing -resets the global listener list
+ */
+ @VisibleForTesting
+ static void resetGlobalListeners() {
+ globalListeners.reset();
+ }
+
@Override
public String getName() {
return name;
@@ -139,28 +404,92 @@ public abstract class AbstractService im
}
/**
- * Verify that that a service is in a given state.
- * @param currentState the desired state
- * @throws IllegalStateException if the service state is different from
- * the desired state
- */
- private void ensureCurrentState(STATE currentState) {
- ServiceOperations.ensureCurrentState(state, currentState);
+ * Notify local and global listeners of state changes.
+ * Exceptions raised by listeners are NOT passed up.
+ */
+ private void notifyListeners() {
+ try {
+ listeners.notifyListeners(this);
+ globalListeners.notifyListeners(this);
+ } catch (Throwable e) {
+ LOG.warn("Exception while notifying listeners of " + this + ": " + e,
+ e);
+ }
+ }
+
+ /**
+ * Add a state change event to the lifecycle history
+ */
+ private void recordLifecycleEvent() {
+ LifecycleEvent event = new LifecycleEvent();
+ event.time = System.currentTimeMillis();
+ event.state = getServiceState();
+ lifecycleHistory.add(event);
+ }
+
+ @Override
+ public synchronized List<LifecycleEvent> getLifecycleHistory() {
+ return new ArrayList<LifecycleEvent>(lifecycleHistory);
+ }
+
+ /**
+ * Enter a state; record this via {@link #recordLifecycleEvent}
+ * and log at the info level.
+ * @param newState the proposed new state
+ * @return the original state
+ * it wasn't already in that state, and the state model permits state re-entrancy.
+ */
+ private STATE enterState(STATE newState) {
+ assert stateModel != null : "null state in " + name + " " + this.getClass();
+ STATE oldState = stateModel.enterState(newState);
+ if (oldState != newState) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Service: " + getName() + " entered state " + getServiceState());
+ }
+ recordLifecycleEvent();
+ }
+ return oldState;
+ }
+
+ @Override
+ public final boolean isInState(Service.STATE expected) {
+ return stateModel.isInState(expected);
+ }
+
+ @Override
+ public String toString() {
+ return "Service " + name + " in state " + stateModel;
+ }
+
+ /**
+ * Put a blocker to the blocker map -replacing any
+ * with the same name.
+ * @param name blocker name
+ * @param details any specifics on the block. This must be non-null.
+ */
+ protected void putBlocker(String name, String details) {
+ synchronized (blockerMap) {
+ blockerMap.put(name, details);
+ }
}
/**
- * Change to a new state and notify all listeners.
- * This is a private method that is only invoked from synchronized methods,
- * which avoid having to clone the listener list. It does imply that
- * the state change listener methods should be short lived, as they
- * will delay the state transition.
- * @param newState new service state
- */
- private void changeState(STATE newState) {
- state = newState;
- //notify listeners
- for (ServiceStateChangeListener l : listeners) {
- l.stateChanged(this);
+ * Remove a blocker from the blocker map -
+ * this is a no-op if the blocker is not present
+ * @param name the name of the blocker
+ */
+ public void removeBlocker(String name) {
+ synchronized (blockerMap) {
+ blockerMap.remove(name);
+ }
+ }
+
+ @Override
+ public Map<String, String> getBlockers() {
+ synchronized (blockerMap) {
+ Map<String, String> map = new HashMap<String, String>(blockerMap);
+ return map;
}
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java Thu Jun 13 15:58:38 2013
@@ -19,14 +19,12 @@
package org.apache.hadoop.yarn.service;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.YarnRuntimeException;
/**
* Composition of services.
@@ -35,72 +33,115 @@ public class CompositeService extends Ab
private static final Log LOG = LogFactory.getLog(CompositeService.class);
- private List<Service> serviceList = new ArrayList<Service>();
+ /**
+ * Policy on shutdown: attempt to close everything (purest) or
+ * only try to close started services (which assumes
+ * that the service implementations may not handle the stop() operation
+ * except when started.
+ * Irrespective of this policy, if a child service fails during
+ * its init() or start() operations, it will have stop() called on it.
+ */
+ protected static final boolean STOP_ONLY_STARTED_SERVICES = false;
+
+ private final List<Service> serviceList = new ArrayList<Service>();
public CompositeService(String name) {
super(name);
}
- public Collection<Service> getServices() {
- return Collections.unmodifiableList(serviceList);
+ /**
+ * Get an unmodifiable list of services
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ public List<Service> getServices() {
+ synchronized (serviceList) {
+ return Collections.unmodifiableList(serviceList);
+ }
}
- protected synchronized void addService(Service service) {
- serviceList.add(service);
+ protected void addService(Service service) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding service " + service.getName());
+ }
+ synchronized (serviceList) {
+ serviceList.add(service);
+ }
}
protected synchronized boolean removeService(Service service) {
- return serviceList.remove(service);
+ synchronized (serviceList) {
+ return serviceList.add(service);
+ }
}
- public synchronized void init(Configuration conf) {
- for (Service service : serviceList) {
+ protected void serviceInit(Configuration conf) throws Exception {
+ List<Service> services = getServices();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": initing services, size=" + services.size());
+ }
+ for (Service service : services) {
service.init(conf);
}
- super.init(conf);
+ super.serviceInit(conf);
}
- public synchronized void start() {
- int i = 0;
- try {
- for (int n = serviceList.size(); i < n; i++) {
- Service service = serviceList.get(i);
- service.start();
- }
- super.start();
- } catch (Throwable e) {
- LOG.error("Error starting services " + getName(), e);
- // Note that the state of the failed service is still INITED and not
- // STARTED. Even though the last service is not started completely, still
- // call stop() on all services including failed service to make sure cleanup
- // happens.
- stop(i);
- throw new YarnRuntimeException("Failed to Start " + getName(), e);
+ protected void serviceStart() throws Exception {
+ List<Service> services = getServices();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": starting services, size=" + services.size());
+ }
+ for (Service service : services) {
+ // start the service. If this fails that service
+ // will be stopped and an exception raised
+ service.start();
}
+ super.serviceStart();
+ }
+ protected void serviceStop() throws Exception {
+ //stop all services that were started
+ int numOfServicesToStop = serviceList.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop);
+ }
+ stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
+ super.serviceStop();
}
- public synchronized void stop() {
- if (this.getServiceState() == STATE.STOPPED) {
- // The base composite-service is already stopped, don't do anything again.
- return;
- }
- if (serviceList.size() > 0) {
- stop(serviceList.size() - 1);
- }
- super.stop();
- }
-
- private synchronized void stop(int numOfServicesStarted) {
- // stop in reserve order of start
- for (int i = numOfServicesStarted; i >= 0; i--) {
- Service service = serviceList.get(i);
- try {
- service.stop();
- } catch (Throwable t) {
- LOG.info("Error stopping " + service.getName(), t);
+ /**
+ * Stop the services in reverse order
+ *
+ * @param numOfServicesStarted index from where the stop should work
+ * @param stopOnlyStartedServices flag to say "only start services that are
+ * started, not those that are NOTINITED or INITED.
+ * @throws RuntimeException the first exception raised during the
+ * stop process -<i>after all services are stopped</i>
+ */
+ private synchronized void stop(int numOfServicesStarted,
+ boolean stopOnlyStartedServices) {
+ // stop in reverse order of start
+ Exception firstException = null;
+ List<Service> services = getServices();
+ for (int i = numOfServicesStarted - 1; i >= 0; i--) {
+ Service service = services.get(i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping service #" + i + ": " + service);
+ }
+ STATE state = service.getServiceState();
+ //depending on the stop police
+ if (state == STATE.STARTED
+ || (!stopOnlyStartedServices && state == STATE.INITED)) {
+ Exception ex = ServiceOperations.stopQuietly(LOG, service);
+ if (ex != null && firstException == null) {
+ firstException = ex;
+ }
}
}
+ //after stopping all services, rethrow the first exception raised
+ if (firstException != null) {
+ throw ServiceStateException.convert(firstException);
+ }
}
/**
@@ -117,13 +158,8 @@ public class CompositeService extends Ab
@Override
public void run() {
- try {
- // Stop the Composite Service
- compositeService.stop();
- } catch (Throwable t) {
- LOG.info("Error stopping " + compositeService.getName(), t);
- }
+ ServiceOperations.stopQuietly(compositeService);
}
}
-
+
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java Thu Jun 13 15:58:38 2013
@@ -20,6 +20,10 @@ package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
public class FilterService implements Service {
private final Service service;
@@ -45,6 +49,11 @@ public class FilterService implements Se
}
@Override
+ public void close() throws IOException {
+ service.close();
+ }
+
+ @Override
public void register(ServiceStateChangeListener listener) {
service.register(listener);
}
@@ -73,4 +82,34 @@ public class FilterService implements Se
public long getStartTime() {
return startTime;
}
+
+ @Override
+ public boolean isInState(STATE state) {
+ return service.isInState(state);
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ return service.getFailureCause();
+ }
+
+ @Override
+ public STATE getFailureState() {
+ return service.getFailureState();
+ }
+
+ @Override
+ public boolean waitForServiceToStop(long timeout) {
+ return service.waitForServiceToStop(timeout);
+ }
+
+ @Override
+ public List<LifecycleEvent> getLifecycleHistory() {
+ return service.getLifecycleHistory();
+ }
+
+ @Override
+ public Map<String, String> getBlockers() {
+ return service.getBlockers();
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java Thu Jun 13 15:58:38 2013
@@ -20,34 +20,77 @@ package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
/**
* Service LifeCycle.
*/
-public interface Service {
+public interface Service extends Closeable {
/**
* Service states
*/
public enum STATE {
/** Constructed but not initialized */
- NOTINITED,
+ NOTINITED(0, "NOTINITED"),
/** Initialized but not started or stopped */
- INITED,
+ INITED(1, "INITED"),
/** started and not stopped */
- STARTED,
+ STARTED(2, "STARTED"),
/** stopped. No further state transitions are permitted */
- STOPPED
+ STOPPED(3, "STOPPED");
+
+ /**
+ * An integer value for use in array lookup and JMX interfaces.
+ * Although {@link Enum#ordinal()} could do this, explicitly
+ * identify the numbers gives more stability guarantees over time.
+ */
+ private final int value;
+
+ /**
+ * A name of the state that can be used in messages
+ */
+ private final String statename;
+
+ private STATE(int value, String name) {
+ this.value = value;
+ this.statename = name;
+ }
+
+ /**
+ * Get the integer value of a state
+ * @return the numeric value of the state
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Get the name of a state
+ * @return the state's name
+ */
+ @Override
+ public String toString() {
+ return statename;
+ }
}
/**
* Initialize the service.
*
- * The transition must be from {@link STATE#NOTINITED} to {@link STATE#INITED}
- * unless the operation failed and an exception was raised.
+ * The transition MUST be from {@link STATE#NOTINITED} to {@link STATE#INITED}
+ * unless the operation failed and an exception was raised, in which case
+ * {@link #stop()} MUST be invoked and the service enter the state
+ * {@link STATE#STOPPED}.
* @param config the configuration of the service
+ * @throws RuntimeException on any failure during the operation
+
*/
void init(Configuration config);
@@ -55,21 +98,37 @@ public interface Service {
/**
* Start the service.
*
- * The transition should be from {@link STATE#INITED} to {@link STATE#STARTED}
- * unless the operation failed and an exception was raised.
+ * The transition MUST be from {@link STATE#INITED} to {@link STATE#STARTED}
+ * unless the operation failed and an exception was raised, in which case
+ * {@link #stop()} MUST be invoked and the service enter the state
+ * {@link STATE#STOPPED}.
+ * @throws RuntimeException on any failure during the operation
*/
void start();
/**
- * Stop the service.
+ * Stop the service. This MUST be a no-op if the service is already
+ * in the {@link STATE#STOPPED} state. It SHOULD be a best-effort attempt
+ * to stop all parts of the service.
*
- * This operation must be designed to complete regardless of the initial state
- * of the service, including the state of all its internal fields.
+ * The implementation must be designed to complete regardless of the service
+ * state, including the initialized/uninitialized state of all its internal
+ * fields.
+ * @throws RuntimeException on any failure during the stop operation
*/
void stop();
/**
+ * A version of stop() that is designed to be usable in Java7 closure
+ * clauses.
+ * Implementation classes MUST relay this directly to {@link #stop()}
+ * @throws IOException never
+ * @throws RuntimeException on any failure during the stop operation
+ */
+ void close() throws IOException;
+
+ /**
* Register an instance of the service state change events.
* @param listener a new listener
*/
@@ -108,4 +167,52 @@ public interface Service {
* has not yet been started.
*/
long getStartTime();
+
+ /**
+ * Query to see if the service is in a specific state.
+ * In a multi-threaded system, the state may not hold for very long.
+ * @param state the expected state
+ * @return true if, at the time of invocation, the service was in that state.
+ */
+ boolean isInState(STATE state);
+
+ /**
+ * Get the first exception raised during the service failure. If null,
+ * no exception was logged
+ * @return the failure logged during a transition to the stopped state
+ */
+ Throwable getFailureCause();
+
+ /**
+ * Get the state in which the failure in {@link #getFailureCause()} occurred.
+ * @return the state or null if there was no failure
+ */
+ STATE getFailureState();
+
+ /**
+ * Block waiting for the service to stop; uses the termination notification
+ * object to do so.
+ *
+ * This method will only return after all the service stop actions
+ * have been executed (to success or failure), or the timeout elapsed
+ * This method can be called before the service is inited or started; this is
+ * to eliminate any race condition with the service stopping before
+ * this event occurs.
+ * @param timeout timeout in milliseconds. A value of zero means "forever"
+ * @return true iff the service stopped in the time period
+ */
+ boolean waitForServiceToStop(long timeout);
+
+ /**
+ * Get a snapshot of the lifecycle history; it is a static list
+ * @return a possibly empty but never null list of lifecycle events.
+ */
+ public List<LifecycleEvent> getLifecycleHistory();
+
+ /**
+ * Get the blockers on a service -remote dependencies
+ * that are stopping the service from being <i>live</i>.
+ * @return a (snapshotted) map of blocker name->description values
+ */
+ public Map<String, String> getBlockers();
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java Thu Jun 13 15:58:38 2013
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
/**
* This class contains a set of methods to work with services, especially
@@ -33,74 +38,6 @@ public final class ServiceOperations {
}
/**
- * Verify that that a service is in a given state.
- * @param state the actual state a service is in
- * @param expectedState the desired state
- * @throws IllegalStateException if the service state is different from
- * the desired state
- */
- public static void ensureCurrentState(Service.STATE state,
- Service.STATE expectedState) {
- if (state != expectedState) {
- throw new IllegalStateException("For this operation, the " +
- "current service state must be "
- + expectedState
- + " instead of " + state);
- }
- }
-
- /**
- * Initialize a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#NOTINITED}
- * @param configuration the configuration to initialize the service with
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
-
- public static void init(Service service, Configuration configuration) {
- Service.STATE state = service.getServiceState();
- ensureCurrentState(state, Service.STATE.NOTINITED);
- service.init(configuration);
- }
-
- /**
- * Start a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#INITED}
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
-
- public static void start(Service service) {
- Service.STATE state = service.getServiceState();
- ensureCurrentState(state, Service.STATE.INITED);
- service.start();
- }
-
- /**
- * Initialize then start a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#NOTINITED}
- * @param configuration the configuration to initialize the service with
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
- public static void deploy(Service service, Configuration configuration) {
- init(service, configuration);
- start(service);
- }
-
- /**
* Stop a service.
* <p/>Do nothing if the service is null or not
* in a state in which it can be/needs to be stopped.
@@ -111,10 +48,7 @@ public final class ServiceOperations {
*/
public static void stop(Service service) {
if (service != null) {
- Service.STATE state = service.getServiceState();
- if (state == Service.STATE.STARTED) {
- service.stop();
- }
+ service.stop();
}
}
@@ -127,14 +61,93 @@ public final class ServiceOperations {
* @return any exception that was caught; null if none was.
*/
public static Exception stopQuietly(Service service) {
+ return stopQuietly(LOG, service);
+ }
+
+ /**
+ * Stop a service; if it is null do nothing. Exceptions are caught and
+ * logged at warn level. (but not Throwables). This operation is intended to
+ * be used in cleanup operations
+ *
+ * @param log the log to warn at
+ * @param service a service; may be null
+ * @return any exception that was caught; null if none was.
+ * @see ServiceOperations#stopQuietly(Service)
+ */
+ public static Exception stopQuietly(Log log, Service service) {
try {
stop(service);
} catch (Exception e) {
- LOG.warn("When stopping the service " + service.getName()
- + " : " + e,
+ log.warn("When stopping the service " + service.getName()
+ + " : " + e,
e);
return e;
}
return null;
}
+
+
+ /**
+ * Class to manage a list of {@link ServiceStateChangeListener} instances,
+ * including a notification loop that is robust against changes to the list
+ * during the notification process.
+ */
+ public static class ServiceListeners {
+ /**
+ * List of state change listeners; it is final to guarantee
+ * that it will never be null.
+ */
+ private final List<ServiceStateChangeListener> listeners =
+ new ArrayList<ServiceStateChangeListener>();
+
+ /**
+ * Thread-safe addition of a new listener to the end of a list.
+ * Attempts to re-register a listener that is already registered
+ * will be ignored.
+ * @param l listener
+ */
+ public synchronized void add(ServiceStateChangeListener l) {
+ if(!listeners.contains(l)) {
+ listeners.add(l);
+ }
+ }
+
+ /**
+ * Remove any registration of a listener from the listener list.
+ * @param l listener
+ * @return true if the listener was found (and then removed)
+ */
+ public synchronized boolean remove(ServiceStateChangeListener l) {
+ return listeners.remove(l);
+ }
+
+ /**
+ * Reset the listener list
+ */
+ public synchronized void reset() {
+ listeners.clear();
+ }
+
+ /**
+ * Change to a new state and notify all listeners.
+ * This method will block until all notifications have been issued.
+ * It caches the list of listeners before the notification begins,
+ * so additions or removal of listeners will not be visible.
+ * @param service the service that has changed state
+ */
+ public void notifyListeners(Service service) {
+ //take a very fast snapshot of the callback list
+ //very much like CopyOnWriteArrayList, only more minimal
+ ServiceStateChangeListener[] callbacks;
+ synchronized (this) {
+ callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
+ }
+ //iterate through the listeners outside the synchronized method,
+ //ensuring that listener registration/unregistration doesn't break anything
+ for (ServiceStateChangeListener l : callbacks) {
+ l.stateChanged(service);
+ }
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java Thu Jun 13 15:58:38 2013
@@ -49,21 +49,21 @@ public abstract class AbstractLiveliness
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
checkerThread.start();
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
stopped = true;
if (checkerThread != null) {
checkerThread.interrupt();
}
- super.stop();
+ super.serviceStop();
}
protected abstract void expire(O ob);