You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/05/30 17:37:28 UTC
svn commit: r1598640 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodema...
Author: junping_du
Date: Fri May 30 15:37:27 2014
New Revision: 1598640
URL: http://svn.apache.org/r1598640
Log:
YARN-1338. Recover localized resource cache state upon nodemanager restart (Contributed by Jason Lowe)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri May 30 15:37:27 2014
@@ -29,6 +29,9 @@ Release 2.5.0 - UNRELEASED
YARN-1362. Distinguish between nodemanager shutdown for decommission vs shutdown
for restart. (Jason Lowe via junping_du)
+ YARN-1338. Recover localized resource cache state upon nodemanager restart
+ (Jason Lowe via junping_du)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Fri May 30 15:37:27 2014
@@ -156,6 +156,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ </dependency>
</dependencies>
<profiles>
@@ -292,6 +296,7 @@
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
+ <include>yarn_server_nodemanager_recovery.proto</include>
<include>yarn_server_nodemanager_service_protos.proto</include>
<include>LocalizationProtocol.proto</include>
</includes>
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Fri May 30 15:37:27 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -67,6 +68,8 @@ public interface Context {
ApplicationACLsManager getApplicationACLsManager();
+ NMStateStoreService getNMStateStore();
+
boolean getDecommissioned();
void setDecommissioned(boolean isDecommissioned);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri May 30 15:37:27 2014
@@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -78,6 +81,7 @@ public class NodeManager extends Composi
private ContainerManagerImpl containerManager;
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
+ private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false);
@@ -115,9 +119,10 @@ public class NodeManager extends Composi
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInNM nmTokenSecretManager) {
+ NMTokenSecretManagerInNM nmTokenSecretManager,
+ NMStateStoreService stateStore) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
- dirsHandler, aclsManager);
+ dirsHandler, aclsManager, stateStore);
}
protected void doSecureLogin() throws IOException {
@@ -125,11 +130,8 @@ public class NodeManager extends Composi
YarnConfiguration.NM_PRINCIPAL);
}
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
-
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
+ private void initAndStartRecoveryStore(Configuration conf)
+ throws IOException {
boolean recoveryEnabled = conf.getBoolean(
YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
@@ -142,7 +144,36 @@ public class NodeManager extends Composi
}
Path recoveryRoot = new Path(recoveryDirName);
recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
+ nmStore = new NMLeveldbStateStoreService();
+ } else {
+ nmStore = new NMNullStateStoreService();
+ }
+ nmStore.init(conf);
+ nmStore.start();
+ }
+
+ private void stopRecoveryStore() throws IOException {
+ nmStore.stop();
+ if (context.getDecommissioned() && nmStore.canRecover()) {
+ LOG.info("Removing state store due to decommission");
+ Configuration conf = getConfig();
+ Path recoveryRoot = new Path(
+ conf.get(YarnConfiguration.NM_RECOVERY_DIR));
+ LOG.info("Removing state store at " + recoveryRoot
+ + " due to decommission");
+ FileSystem recoveryFs = FileSystem.getLocal(conf);
+ if (!recoveryFs.delete(recoveryRoot, true)) {
+ LOG.warn("Unable to delete " + recoveryRoot);
+ }
}
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
@@ -171,7 +202,7 @@ public class NodeManager extends Composi
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
- nmTokenSecretManager);
+ nmTokenSecretManager, nmStore);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@@ -220,6 +251,7 @@ public class NodeManager extends Composi
return;
}
super.serviceStop();
+ stopRecoveryStore();
DefaultMetricsSystem.shutdown();
}
@@ -272,11 +304,13 @@ public class NodeManager extends Composi
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
+ private final NMStateStoreService stateStore;
private boolean isDecommissioned = false;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
- LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
+ LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
+ NMStateStoreService stateStore) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@@ -284,6 +318,7 @@ public class NodeManager extends Composi
this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
+ this.stateStore = stateStore;
}
/**
@@ -352,6 +387,11 @@ public class NodeManager extends Composi
}
@Override
+ public NMStateStoreService getNMStateStore() {
+ return stateStore;
+ }
+
+ @Override
public boolean getDecommissioned() {
return isDecommissioned;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri May 30 15:37:27 2014
@@ -22,6 +22,7 @@ import static org.apache.hadoop.service.
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -116,6 +117,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -218,6 +220,15 @@ public class ContainerManagerImpl extend
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf);
+ recover();
+ }
+
+ private void recover() throws IOException, URISyntaxException {
+ NMStateStoreService stateStore = context.getNMStateStore();
+ if (stateStore.canRecover()) {
+ rsrcLocalizationSrvc.recoverLocalizedResources(
+ stateStore.loadLocalizationState());
+ }
}
protected LogHandler createLogHandler(Configuration conf, Context context,
@@ -239,7 +250,7 @@ public class ContainerManagerImpl extend
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
- deletionContext, dirsHandler);
+ deletionContext, dirsHandler, context.getNMStateStore());
}
protected ContainersLauncher createContainersLauncher(Context context,
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Fri May 30 15:37:27 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* {@link LocalCacheDirectoryManager} is used for managing hierarchical
* directories for local cache. It will allow to restrict the number of files in
@@ -99,6 +101,57 @@ public class LocalCacheDirectoryManager
}
}
+ /**
+ * Increment the file count for a relative directory within the cache
+ *
+ * @param relPath the relative path
+ */
+ public synchronized void incrementFileCountForPath(String relPath) {
+ relPath = relPath == null ? "" : relPath.trim();
+ Directory subDir = knownDirectories.get(relPath);
+ if (subDir == null) {
+ int dirnum = Directory.getDirectoryNumber(relPath);
+ totalSubDirectories = Math.max(dirnum, totalSubDirectories);
+ subDir = new Directory(dirnum);
+ nonFullDirectories.add(subDir);
+ knownDirectories.put(subDir.getRelativePath(), subDir);
+ }
+ if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
+ nonFullDirectories.remove(subDir);
+ }
+ }
+
+ /**
+ * Given a path to a directory within a local cache tree return the
+ * root of the cache directory.
+ *
+ * @param path the directory within a cache directory
+ * @return the local cache directory root or null if not found
+ */
+ public static Path getCacheDirectoryRoot(Path path) {
+ while (path != null) {
+ String name = path.getName();
+ if (name.length() != 1) {
+ return path;
+ }
+ int dirnum = DIRECTORIES_PER_LEVEL;
+ try {
+ dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL);
+ } catch (NumberFormatException e) {
+ }
+ if (dirnum >= DIRECTORIES_PER_LEVEL) {
+ return path;
+ }
+ path = path.getParent();
+ }
+ return path;
+ }
+
+ @VisibleForTesting
+ synchronized Directory getDirectory(String relPath) {
+ return knownDirectories.get(relPath);
+ }
+
/*
* It limits the number of files and sub directories in the directory to the
* limit LocalCacheDirectoryManager#perDirectoryFileLimit.
@@ -108,11 +161,9 @@ public class LocalCacheDirectoryManager
private final String relativePath;
private int fileCount;
- public Directory(int directoryNo) {
- fileCount = 0;
- if (directoryNo == 0) {
- relativePath = "";
- } else {
+ static String getRelativePath(int directoryNo) {
+ String relativePath = "";
+ if (directoryNo > 0) {
String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
StringBuffer sb = new StringBuffer();
if (tPath.length() == 1) {
@@ -128,6 +179,27 @@ public class LocalCacheDirectoryManager
}
relativePath = sb.toString();
}
+ return relativePath;
+ }
+
+ static int getDirectoryNumber(String relativePath) {
+ String numStr = relativePath.replace("/", "");
+ if (relativePath.isEmpty()) {
+ return 0;
+ }
+ if (numStr.length() > 1) {
+ // undo step from getRelativePath() to reuse 0th sub directory
+ String firstChar = Integer.toString(
+ Integer.parseInt(numStr.substring(0, 1),
+ DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL);
+ numStr = firstChar + numStr.substring(1);
+ }
+ return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1;
+ }
+
+ public Directory(int directoryNo) {
+ fileCount = 0;
+ relativePath = getRelativePath(directoryNo);
}
public int incrementAndGetCount() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Fri May 30 15:37:27 2014
@@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
@@ -34,18 +31,11 @@ import com.google.common.annotations.Vis
interface LocalResourcesTracker
extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
- // TODO: Not used at all!!
- boolean contains(LocalResourceRequest resource);
-
boolean remove(LocalizedResource req, DeletionService delService);
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
String getUser();
- long nextUniqueNumber();
-
- @VisibleForTesting
- @Private
LocalizedResource getLocalizedResource(LocalResourceRequest request);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri May 30 15:37:27 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -27,14 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import com.google.common.annotations.VisibleForTesting;
@@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen
.compile(RANDOM_DIR_REGEX);
private final String user;
+ private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
@@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
+ private NMStateStoreService stateStore;
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
- this(user, dispatcher,
+ public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+ Configuration conf, NMStateStoreService stateStore) {
+ this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
- useLocalCacheDirectoryManager, conf);
+ useLocalCacheDirectoryManager, conf, stateStore);
}
- LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
+ boolean useLocalCacheDirectoryManager, Configuration conf,
+ NMStateStoreService stateStore) {
+ this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
@@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
+ this.stateStore = stateStore;
}
/*
@@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
- localrsrc.remove(req);
- decrementFileCountForLocalCacheDirectory(req, rsrc);
+ removeResource(req);
rsrc = null;
}
if (null == rsrc) {
@@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen
}
break;
case LOCALIZATION_FAILED:
- decrementFileCountForLocalCacheDirectory(req, null);
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
- localrsrc.remove(req);
+ removeResource(req);
+ break;
+ case RECOVERED:
+ if (rsrc != null) {
+ LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
+ return;
+ }
+ rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
+ localrsrc.put(req, rsrc);
break;
}
+
rsrc.handle(event);
+
+ if (event.getType() == ResourceEventType.LOCALIZED) {
+ if (rsrc.getLocalPath() != null) {
+ try {
+ stateStore.finishResourceLocalization(user, appId,
+ buildLocalizedResourceProto(rsrc));
+ } catch (IOException ioe) {
+ LOG.error("Error storing resource state for " + rsrc, ioe);
+ }
+ } else {
+ LOG.warn("Resource " + rsrc + " localized without a location");
+ }
+ }
+ }
+
+ private LocalizedResource recoverResource(LocalResourceRequest req,
+ ResourceRecoveredEvent event) {
+ // unique number for a resource is the directory of the resource
+ Path localDir = event.getLocalPath().getParent();
+ long rsrcId = Long.parseLong(localDir.getName());
+
+ // update ID generator to avoid conflicts with existing resources
+ while (true) {
+ long currentRsrcId = uniqueNumberGenerator.get();
+ long nextRsrcId = Math.max(currentRsrcId, rsrcId);
+ if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
+ break;
+ }
+ }
+
+ incrementFileCountForLocalCacheDirectory(localDir.getParent());
+
+ return new LocalizedResource(req, dispatcher);
+ }
+
+ private LocalizedResourceProto buildLocalizedResourceProto(
+ LocalizedResource rsrc) {
+ return LocalizedResourceProto.newBuilder()
+ .setResource(buildLocalResourceProto(rsrc.getRequest()))
+ .setLocalPath(rsrc.getLocalPath().toString())
+ .setSize(rsrc.getSize())
+ .build();
+ }
+
+ private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
+ LocalResourcePBImpl lrpb;
+ if (!(lr instanceof LocalResourcePBImpl)) {
+ lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
+ lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
+ lr.getPattern());
+ }
+ lrpb = (LocalResourcePBImpl) lr;
+ return lrpb.getProto();
+ }
+
+ public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
+ if (useLocalCacheDirectoryManager) {
+ Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
+ cacheDir);
+ if (cacheRoot != null) {
+ LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
+ if (dir == null) {
+ dir = new LocalCacheDirectoryManager(conf);
+ LocalCacheDirectoryManager otherDir =
+ directoryManagers.putIfAbsent(cacheRoot, dir);
+ if (otherDir != null) {
+ dir = otherDir;
+ }
+ }
+ if (cacheDir.equals(cacheRoot)) {
+ dir.incrementFileCountForPath("");
+ } else {
+ String dirStr = cacheDir.toUri().getRawPath();
+ String rootStr = cacheRoot.toUri().getRawPath();
+ dir.incrementFileCountForPath(
+ dirStr.substring(rootStr.length() + 1));
+ }
+ }
+ }
}
/*
@@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen
}
@Override
- public boolean contains(LocalResourceRequest resource) {
- return localrsrc.containsKey(resource);
- }
-
- @Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
@@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen
+ " with non-zero refcount");
return false;
} else { // ResourceState is LOCALIZED or INIT
- localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
- decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+ removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
+ private void removeResource(LocalResourceRequest req) {
+ LocalizedResource rsrc = localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
+ if (rsrc != null) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(user, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " from state store",
+ e);
+ }
+ }
+ }
+ }
+
/**
* Returns the path up to the random directory component.
*/
@@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen
@Override
public Path
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
@@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
- Path rPath = localDirPath;
+ rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
@@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
- return rPath;
- } else {
- return localDirPath;
}
- }
- @Override
- public long nextUniqueNumber() {
- return uniqueNumberGenerator.incrementAndGet();
+ rPath = new Path(rPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
+ Path localPath = new Path(rPath, req.getPath().getName());
+ LocalizedResource rsrc = localrsrc.get(req);
+ rsrc.setLocalPath(localPath);
+ LocalResource lr = LocalResource.newInstance(req.getResource(),
+ req.getType(), req.getVisibility(), req.getSize(),
+ req.getTimestamp());
+ try {
+ stateStore.startResourceLocalization(user, appId,
+ ((LocalResourcePBImpl) lr).getProto(), localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to record localization start for " + rsrc, e);
+ }
+ return rPath;
}
- @VisibleForTesting
- @Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
-}
\ No newline at end of file
+
+ @VisibleForTesting
+ LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
+ LocalCacheDirectoryManager mgr = null;
+ if (useLocalCacheDirectoryManager) {
+ mgr = directoryManagers.get(localDirPath);
+ }
+ return mgr;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri May 30 15:37:27 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -54,8 +55,8 @@ public class LocalizedResource implement
private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
- Path localPath;
- long size = -1;
+ volatile Path localPath;
+ volatile long size = -1;
final LocalResourceRequest rsrc;
final Dispatcher dispatcher;
final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@@ -76,6 +77,8 @@ public class LocalizedResource implement
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
+ .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+ ResourceEventType.RECOVERED, new RecoveredTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@@ -157,6 +160,10 @@ public class LocalizedResource implement
return localPath;
}
+ public void setLocalPath(Path localPath) {
+ this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
+ }
+
public long getTimestamp() {
return timestamp.get();
}
@@ -234,7 +241,8 @@ public class LocalizedResource implement
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
- rsrc.localPath = locEvent.getLocation();
+ rsrc.localPath =
+ Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
@@ -291,4 +299,13 @@ public class LocalizedResource implement
rsrc.release(relEvent.getContainer());
}
}
+
+ private static class RecoveredTransition extends ResourceTransition {
+ @Override
+ public void transition(LocalizedResource rsrc, ResourceEvent event) {
+ ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
+ rsrc.localPath = recoveredEvent.getLocalPath();
+ rsrc.size = recoveredEvent.getSize();
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri May 30 15:37:27 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@@ -109,10 +112,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -142,6 +150,7 @@ public class ResourceLocalizationService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager;
+ private NMStateStoreService stateStore;
private LocalResourcesTracker publicRsrc;
@@ -163,7 +172,7 @@ public class ResourceLocalizationService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
- LocalDirsHandlerService dirsHandler) {
+ LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@@ -175,6 +184,7 @@ public class ResourceLocalizationService
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
+ this.stateStore = stateStore;
}
FileContext getLocalFileContext(Configuration conf) {
@@ -203,15 +213,17 @@ public class ResourceLocalizationService
@Override
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
- this.publicRsrc =
- new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
+ true, conf, stateStore);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
- cleanUpLocalDir(lfs,delService);
+ if (!stateStore.canRecover()) {
+ cleanUpLocalDir(lfs,delService);
+ }
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
@@ -249,6 +261,74 @@ public class ResourceLocalizationService
super.serviceInit(conf);
}
+ //Recover localized resources after an NM restart
+ public void recoverLocalizedResources(RecoveredLocalizationState state)
+ throws URISyntaxException {
+ LocalResourceTrackerState trackerState = state.getPublicTrackerState();
+ recoverTrackerResources(publicRsrc, trackerState);
+
+ for (Map.Entry<String, RecoveredUserResources> userEntry :
+ state.getUserResources().entrySet()) {
+ String user = userEntry.getKey();
+ RecoveredUserResources userResources = userEntry.getValue();
+ trackerState = userResources.getPrivateTrackerState();
+ if (!trackerState.isEmpty()) {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ null, dispatcher, true, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+
+ for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+ userResources.getAppTrackerStates().entrySet()) {
+ trackerState = appEntry.getValue();
+ if (!trackerState.isEmpty()) {
+ ApplicationId appId = appEntry.getKey();
+ String appIdStr = ConverterUtils.toString(appId);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+ }
+ }
+ }
+
+ private void recoverTrackerResources(LocalResourcesTracker tracker,
+ LocalResourceTrackerState state) throws URISyntaxException {
+ for (LocalizedResourceProto proto : state.getLocalizedResources()) {
+ LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ LOG.info("Recovering localized resource " + req + " at "
+ + proto.getLocalPath());
+ tracker.handle(new ResourceRecoveredEvent(req,
+ new Path(proto.getLocalPath()), proto.getSize()));
+ }
+
+ for (Map.Entry<LocalResourceProto, Path> entry :
+ state.getInProgressResources().entrySet()) {
+ LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ Path localPath = entry.getValue();
+ tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+ // delete any in-progress localizations, containers will request again
+ LOG.info("Deleting in-progress localization for " + req + " at "
+ + localPath);
+ tracker.remove(tracker.getLocalizedResource(req), delService);
+ }
+
+ // TODO: remove untracked directories in local filesystem
+ }
+
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
@@ -337,17 +417,10 @@ public class ResourceLocalizationService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
- dispatcher, true, super.getConfig()));
- if (null != appRsrc.putIfAbsent(
- ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
- .getConfig()))) {
- LOG.warn("Initializing application " + app + " already present");
- assert false; // TODO: FIXME assert doesn't help
- // ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
- }
+ null, dispatcher, true, super.getConfig(), stateStore));
+ String appIdStr = ConverterUtils.toString(app.getAppId());
+ appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
+ app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
@@ -446,18 +519,28 @@ public class ResourceLocalizationService
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
- String userName;
- String appIDStr;
+ String userName = application.getUser();
+ ApplicationId appId = application.getAppId();
+ String appIDStr = application.toString();
LocalResourcesTracker appLocalRsrcsTracker =
- appRsrc.remove(ConverterUtils.toString(application.getAppId()));
- if (null == appLocalRsrcsTracker) {
+ appRsrc.remove(ConverterUtils.toString(appId));
+ if (appLocalRsrcsTracker != null) {
+ for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(userName, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+ + " from state store", e);
+ }
+ }
+ }
+ } else {
LOG.warn("Removing uninitialized application " + application);
}
- // TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
- userName = application.getUser();
- appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
@@ -668,19 +751,15 @@ public class ResourceLocalizationService
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
- Path publicDirDestPath =
+ Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
- Path hierarchicalPath =
- publicRsrc.getPathForLocalization(key, publicDirDestPath);
- if (!hierarchicalPath.equals(publicDirDestPath)) {
- publicDirDestPath = hierarchicalPath;
+ Path publicDirDestPath =
+ publicRsrc.getPathForLocalization(key, publicRootPath);
+ if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
- publicDirDestPath =
- new Path(publicDirDestPath, Long.toString(publicRsrc
- .nextUniqueNumber()));
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@@ -968,9 +1047,8 @@ public class ResourceLocalizationService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
- dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
- dirPath);
- return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+ return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Fri May 30 15:37:27 2014
@@ -31,5 +31,7 @@ public enum ResourceEventType {
/** See {@link ResourceReleaseEvent} */
RELEASE,
/** See {@link ResourceFailedLocalizationEvent} */
- LOCALIZATION_FAILED
+ LOCALIZATION_FAILED,
+ /** See {@link ResourceRecoveredEvent} */
+ RECOVERED
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java?rev=1598640&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceRecoveredEvent.java Fri May 30 15:37:27 2014
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ResourceRecoveredEvent extends ResourceEvent {
+
+ private final Path localPath;
+ private final long size;
+
+ public ResourceRecoveredEvent(LocalResourceRequest rsrc, Path localPath,
+ long size) {
+ super(rsrc, ResourceEventType.RECOVERED);
+ this.localPath = localPath;
+ this.size = size;
+ }
+
+ public Path getLocalPath() {
+ return localPath;
+ }
+
+ public long getSize() {
+ return size;
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1598640&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Fri May 30 15:37:27 2014
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.recovery;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+public class NMLeveldbStateStoreService extends NMStateStoreService {
+
+ public static final Log LOG =
+ LogFactory.getLog(NMLeveldbStateStoreService.class);
+
+ private static final String DB_NAME = "yarn-nm-state";
+ private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
+ private static final String DB_SCHEMA_VERSION = "1.0";
+
+ private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
+ private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
+ LOCALIZATION_KEY_PREFIX + "public/";
+ private static final String LOCALIZATION_PRIVATE_KEY_PREFIX =
+ LOCALIZATION_KEY_PREFIX + "private/";
+ private static final String LOCALIZATION_STARTED_SUFFIX = "started/";
+ private static final String LOCALIZATION_COMPLETED_SUFFIX = "completed/";
+ private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
+ private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
+
+ private DB db;
+
+ public NMLeveldbStateStoreService() {
+ super(NMLeveldbStateStoreService.class.getName());
+ }
+
+ @Override
+ protected void startStorage() throws IOException {
+ }
+
+ @Override
+ protected void closeStorage() throws IOException {
+ if (db != null) {
+ db.close();
+ }
+ }
+
+
+ @Override
+ public RecoveredLocalizationState loadLocalizationState()
+ throws IOException {
+ RecoveredLocalizationState state = new RecoveredLocalizationState();
+
+ try {
+ LeveldbIterator iter = new LeveldbIterator(db);
+ iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
+ state.publicTrackerState = loadResourceTrackerState(iter,
+ LOCALIZATION_PUBLIC_KEY_PREFIX);
+
+ iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
+ break;
+ }
+
+ int userEndPos = key.indexOf('/',
+ LOCALIZATION_PRIVATE_KEY_PREFIX.length());
+ if (userEndPos < 0) {
+ throw new IOException("Unable to determine user in resource key: "
+ + key);
+ }
+ String user = key.substring(
+ LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
+ state.userResources.put(user, loadUserLocalizedResources(iter,
+ key.substring(0, userEndPos+1)));
+ }
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ return state;
+ }
+
+ private LocalResourceTrackerState loadResourceTrackerState(
+ LeveldbIterator iter, String keyPrefix) throws IOException {
+ final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX;
+ final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
+ LocalResourceTrackerState state = new LocalResourceTrackerState();
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+
+ if (key.startsWith(completedPrefix)) {
+ state.localizedResources = loadCompletedResources(iter,
+ completedPrefix);
+ } else if (key.startsWith(startedPrefix)) {
+ state.inProgressResources = loadStartedResources(iter, startedPrefix);
+ } else {
+ throw new IOException("Unexpected key in resource tracker state: "
+ + key);
+ }
+ }
+
+ return state;
+ }
+
+ private List<LocalizedResourceProto> loadCompletedResources(
+ LeveldbIterator iter, String keyPrefix) throws IOException {
+ List<LocalizedResourceProto> rsrcs =
+ new ArrayList<LocalizedResourceProto>();
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading completed resource from " + key);
+ }
+ rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue()));
+ iter.next();
+ }
+
+ return rsrcs;
+ }
+
+ private Map<LocalResourceProto, Path> loadStartedResources(
+ LeveldbIterator iter, String keyPrefix) throws IOException {
+ Map<LocalResourceProto, Path> rsrcs =
+ new HashMap<LocalResourceProto, Path>();
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+
+ Path localPath = new Path(key.substring(keyPrefix.length()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading in-progress resource at " + localPath);
+ }
+ rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath);
+ iter.next();
+ }
+
+ return rsrcs;
+ }
+
+ private RecoveredUserResources loadUserLocalizedResources(
+ LeveldbIterator iter, String keyPrefix) throws IOException {
+ RecoveredUserResources userResources = new RecoveredUserResources();
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+
+ if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) {
+ userResources.privateTrackerState = loadResourceTrackerState(iter,
+ keyPrefix + LOCALIZATION_FILECACHE_SUFFIX);
+ } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX,
+ keyPrefix.length())) {
+ int appIdStartPos = keyPrefix.length() +
+ LOCALIZATION_APPCACHE_SUFFIX.length();
+ int appIdEndPos = key.indexOf('/', appIdStartPos);
+ if (appIdEndPos < 0) {
+ throw new IOException("Unable to determine appID in resource key: "
+ + key);
+ }
+ ApplicationId appId = ConverterUtils.toApplicationId(
+ key.substring(appIdStartPos, appIdEndPos));
+ userResources.appTrackerStates.put(appId,
+ loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1)));
+ } else {
+ throw new IOException("Unexpected user resource key " + key);
+ }
+ }
+ return userResources;
+ }
+
+ @Override
+ public void startResourceLocalization(String user, ApplicationId appId,
+ LocalResourceProto proto, Path localPath) throws IOException {
+ String key = getResourceStartedKey(user, appId, localPath.toString());
+ try {
+ db.put(bytes(key), proto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void finishResourceLocalization(String user, ApplicationId appId,
+ LocalizedResourceProto proto) throws IOException {
+ String localPath = proto.getLocalPath();
+ String startedKey = getResourceStartedKey(user, appId, localPath);
+ String completedKey = getResourceCompletedKey(user, appId, localPath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing localized resource to " + completedKey);
+ }
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.delete(bytes(startedKey));
+ batch.put(bytes(completedKey), proto.toByteArray());
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void removeLocalizedResource(String user, ApplicationId appId,
+ Path localPath) throws IOException {
+ String localPathStr = localPath.toString();
+ String startedKey = getResourceStartedKey(user, appId, localPathStr);
+ String completedKey = getResourceCompletedKey(user, appId, localPathStr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing local resource at " + localPathStr);
+ }
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.delete(bytes(startedKey));
+ batch.delete(bytes(completedKey));
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ private String getResourceStartedKey(String user, ApplicationId appId,
+ String localPath) {
+ return getResourceTrackerKeyPrefix(user, appId)
+ + LOCALIZATION_STARTED_SUFFIX + localPath;
+ }
+
+ private String getResourceCompletedKey(String user, ApplicationId appId,
+ String localPath) {
+ return getResourceTrackerKeyPrefix(user, appId)
+ + LOCALIZATION_COMPLETED_SUFFIX + localPath;
+ }
+
+ private String getResourceTrackerKeyPrefix(String user,
+ ApplicationId appId) {
+ if (user == null) {
+ return LOCALIZATION_PUBLIC_KEY_PREFIX;
+ }
+ if (appId == null) {
+ return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/"
+ + LOCALIZATION_FILECACHE_SUFFIX;
+ }
+ return LOCALIZATION_PRIVATE_KEY_PREFIX + user + "/"
+ + LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
+ }
+
+
+ @Override
+ protected void initStorage(Configuration conf)
+ throws IOException {
+ Path storeRoot = createStorageDir(conf);
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LeveldbLogger());
+ LOG.info("Using state database at " + storeRoot + " for recovery");
+ File dbfile = new File(storeRoot.toString());
+ byte[] schemaVersionData = null;
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ try {
+ schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating state database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ schemaVersionData = bytes(DB_SCHEMA_VERSION);
+ db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData);
+ } catch (DBException dbErr) {
+ throw new IOException(dbErr.getMessage(), dbErr);
+ }
+ } else {
+ throw e;
+ }
+ }
+ if (schemaVersionData != null) {
+ String schemaVersion = asString(schemaVersionData);
+ // only support exact schema matches for now
+ if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
+ throw new IOException("Incompatible state database schema, found "
+ + schemaVersion + " expected " + DB_SCHEMA_VERSION);
+ }
+ } else {
+ throw new IOException("State database schema version not found");
+ }
+ }
+
+ private Path createStorageDir(Configuration conf) throws IOException {
+ final String storeUri = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
+ if (storeUri == null) {
+ throw new IOException("No store location directory configured in " +
+ YarnConfiguration.NM_RECOVERY_DIR);
+ }
+
+ Path root = new Path(storeUri, DB_NAME);
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.mkdirs(root, new FsPermission((short)0700));
+ return root;
+ }
+
+
+ private static class LeveldbLogger implements Logger {
+ private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1598640&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Fri May 30 15:37:27 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+
+// The state store to use when state isn't being stored
+public class NMNullStateStoreService extends NMStateStoreService {
+
+ public NMNullStateStoreService() {
+ super(NMNullStateStoreService.class.getName());
+ }
+
+ @Override
+ public boolean canRecover() {
+ return false;
+ }
+
+ @Override
+ public RecoveredLocalizationState loadLocalizationState()
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void startResourceLocalization(String user, ApplicationId appId,
+ LocalResourceProto proto, Path localPath) throws IOException {
+ }
+
+ @Override
+ public void finishResourceLocalization(String user, ApplicationId appId,
+ LocalizedResourceProto proto) throws IOException {
+ }
+
+ @Override
+ public void removeLocalizedResource(String user, ApplicationId appId,
+ Path localPath) throws IOException {
+ }
+
+ @Override
+ protected void initStorage(Configuration conf) throws IOException {
+ }
+
+ @Override
+ protected void startStorage() throws IOException {
+ }
+
+ @Override
+ protected void closeStorage() throws IOException {
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1598640&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Fri May 30 15:37:27 2014
@@ -0,0 +1,163 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.recovery;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+
+@Private
+@Unstable
+public abstract class NMStateStoreService extends AbstractService {
+
+ public NMStateStoreService(String name) {
+ super(name);
+ }
+
+ public static class LocalResourceTrackerState {
+ List<LocalizedResourceProto> localizedResources =
+ new ArrayList<LocalizedResourceProto>();
+ Map<LocalResourceProto, Path> inProgressResources =
+ new HashMap<LocalResourceProto, Path>();
+
+ public List<LocalizedResourceProto> getLocalizedResources() {
+ return localizedResources;
+ }
+
+ public Map<LocalResourceProto, Path> getInProgressResources() {
+ return inProgressResources;
+ }
+
+ public boolean isEmpty() {
+ return localizedResources.isEmpty() && inProgressResources.isEmpty();
+ }
+ }
+
+ public static class RecoveredUserResources {
+ LocalResourceTrackerState privateTrackerState =
+ new LocalResourceTrackerState();
+ Map<ApplicationId, LocalResourceTrackerState> appTrackerStates =
+ new HashMap<ApplicationId, LocalResourceTrackerState>();
+
+ public LocalResourceTrackerState getPrivateTrackerState() {
+ return privateTrackerState;
+ }
+
+ public Map<ApplicationId, LocalResourceTrackerState>
+ getAppTrackerStates() {
+ return appTrackerStates;
+ }
+ }
+
+ public static class RecoveredLocalizationState {
+ LocalResourceTrackerState publicTrackerState =
+ new LocalResourceTrackerState();
+ Map<String, RecoveredUserResources> userResources =
+ new HashMap<String, RecoveredUserResources>();
+
+ public LocalResourceTrackerState getPublicTrackerState() {
+ return publicTrackerState;
+ }
+
+ public Map<String, RecoveredUserResources> getUserResources() {
+ return userResources;
+ }
+ }
+
+ /** Initialize the state storage */
+ @Override
+ public void serviceInit(Configuration conf) throws IOException {
+ initStorage(conf);
+ }
+
+ /** Start the state storage for use */
+ @Override
+ public void serviceStart() throws IOException {
+ startStorage();
+ }
+
+ /** Shutdown the state storage. */
+ @Override
+ public void serviceStop() throws IOException {
+ closeStorage();
+ }
+
+ public boolean canRecover() {
+ return true;
+ }
+
+
+ /**
+ * Load the state of localized resources
+ * @return recovered localized resource state
+ * @throws IOException
+ */
+ public abstract RecoveredLocalizationState loadLocalizationState()
+ throws IOException;
+
+ /**
+ * Record the start of localization for a resource
+ * @param user the username or null if the resource is public
+ * @param appId the application ID if the resource is app-specific or null
+ * @param proto the resource request
+ * @param localPath local filesystem path where the resource will be stored
+ * @throws IOException
+ */
+ public abstract void startResourceLocalization(String user,
+ ApplicationId appId, LocalResourceProto proto, Path localPath)
+ throws IOException;
+
+ /**
+ * Record the completion of a resource localization
+ * @param user the username or null if the resource is public
+ * @param appId the application ID if the resource is app-specific or null
+ * @param proto the serialized localized resource
+ * @throws IOException
+ */
+ public abstract void finishResourceLocalization(String user,
+ ApplicationId appId, LocalizedResourceProto proto) throws IOException;
+
+ /**
+ * Remove records related to a resource localization
+ * @param user the username or null if the resource is public
+ * @param appId the application ID if the resource is app-specific or null
+ * @param localPath local filesystem path where the resource will be stored
+ * @throws IOException
+ */
+ public abstract void removeLocalizedResource(String user,
+ ApplicationId appId, Path localPath) throws IOException;
+
+
+ protected abstract void initStorage(Configuration conf) throws IOException;
+
+ protected abstract void startStorage() throws IOException;
+
+ protected abstract void closeStorage() throws IOException;
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1598640&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Fri May 30 15:37:27 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "YarnServerNodemanagerRecoveryProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_protos.proto";
+
+message LocalizedResourceProto {
+ optional LocalResourceProto resource = 1;
+ optional string localPath = 2;
+ optional int64 size = 3;
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1598640&r1=1598639&r2=1598640&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Fri May 30 15:37:27 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
public class DummyContainerManager extends ContainerManagerImpl {
@@ -75,7 +76,7 @@ public class DummyContainerManager exten
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec,
- deletionContext, super.dirsHandler) {
+ deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {