You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/09 03:33:39 UTC
svn commit: r1465853 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemana...
Author: vinodkv
Date: Tue Apr 9 01:33:38 2013
New Revision: 1465853
URL: http://svn.apache.org/r1465853
Log:
YARN-99. Modify private distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr 9 01:33:38 2013
@@ -201,6 +201,10 @@ Release 2.0.5-beta - UNRELEASED
to implement closeable so that they can be stopped when needed via
RPC.stopProxy(). (Siddharth Seth via vinodkv)
+ YARN-99. Modify private distributed cache to localize files such that no
+ local directory hits unix file count limits and thus prevent job failures.
+ (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java Tue Apr 9 01:33:38 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@VisibleForTesting
+public interface ResourceLocalizationSpec {
+
+ public void setResource(LocalResource rsrc);
+
+ public LocalResource getResource();
+
+ public void setDestinationDirectory(URL destinationDirectory);
+
+ public URL getDestinationDirectory();
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java Tue Apr 9 01:33:38 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProtoOrBuilder;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+
+public class ResourceLocalizationSpecPBImpl extends
+ ProtoBase<ResourceLocalizationSpecProto> implements
+ ResourceLocalizationSpec {
+
+ private ResourceLocalizationSpecProto proto = ResourceLocalizationSpecProto
+ .getDefaultInstance();
+ private ResourceLocalizationSpecProto.Builder builder = null;
+ private boolean viaProto;
+ private LocalResource resource = null;
+ private URL destinationDirectory = null;
+
+ public ResourceLocalizationSpecPBImpl() {
+ builder = ResourceLocalizationSpecProto.newBuilder();
+ }
+
+ public ResourceLocalizationSpecPBImpl(ResourceLocalizationSpecProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ @Override
+ public LocalResource getResource() {
+ ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+ if (resource != null) {
+ return resource;
+ }
+ if (!p.hasResource()) {
+ return null;
+ }
+ resource = new LocalResourcePBImpl(p.getResource());
+ return resource;
+ }
+
+ @Override
+ public void setResource(LocalResource rsrc) {
+ maybeInitBuilder();
+ resource = rsrc;
+ }
+
+ @Override
+ public URL getDestinationDirectory() {
+ ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+ if (destinationDirectory != null) {
+ return destinationDirectory;
+ }
+ if (!p.hasDestinationDirectory()) {
+ return null;
+ }
+ destinationDirectory = new URLPBImpl(p.getDestinationDirectory());
+ return destinationDirectory;
+ }
+
+ @Override
+ public void setDestinationDirectory(URL destinationDirectory) {
+ maybeInitBuilder();
+ this.destinationDirectory = destinationDirectory;
+ }
+
+ @Override
+ public ResourceLocalizationSpecProto getProto() {
+ mergeLocalToBuilder();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (builder == null || viaProto) {
+ builder = ResourceLocalizationSpecProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToBuilder() {
+ ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder;
+ if (this.resource != null
+ && !(l.getResource()
+ .equals(((LocalResourcePBImpl) resource).getProto()))) {
+ maybeInitBuilder();
+ builder.setResource(((LocalResourcePBImpl) resource).getProto());
+ }
+ if (this.destinationDirectory != null
+ && !(l.getDestinationDirectory()
+ .equals(((URLPBImpl) destinationDirectory).getProto()))) {
+ maybeInitBuilder();
+ builder.setDestinationDirectory(((URLPBImpl) destinationDirectory)
+ .getProto());
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java Tue Apr 9 01:33:38 2013
@@ -18,18 +18,13 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.*;
public interface LocalizerHeartbeatResponse {
- public LocalizerAction getLocalizerAction();
- public List<LocalResource> getAllResources();
- public LocalResource getLocalResource(int i);
+ public LocalizerAction getLocalizerAction();
public void setLocalizerAction(LocalizerAction action);
- public void addAllResources(List<LocalResource> resources);
- public void addResource(LocalResource resource);
- public void removeResource(int index);
- public void clearResources();
-}
+ public List<ResourceLocalizationSpec> getResourceSpecs();
+ public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java Tue Apr 9 01:33:38 2013
@@ -21,13 +21,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponseP
LocalizerHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
- private List<LocalResource> resources;
+ private List<ResourceLocalizationSpec> resourceSpecs;
public LocalizerHeartbeatResponsePBImpl() {
builder = LocalizerHeartbeatResponseProto.newBuilder();
}
- public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+ public LocalizerHeartbeatResponsePBImpl(
+ LocalizerHeartbeatResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponseP
}
private void mergeLocalToBuilder() {
- if (resources != null) {
+ if (resourceSpecs != null) {
addResourcesToProto();
}
}
@@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponseP
viaProto = false;
}
+ @Override
public LocalizerAction getLocalizerAction() {
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasAction()) {
@@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponseP
return convertFromProtoFormat(p.getAction());
}
- public List<LocalResource> getAllResources() {
- initResources();
- return this.resources;
- }
-
- public LocalResource getLocalResource(int i) {
+ @Override
+ public List<ResourceLocalizationSpec> getResourceSpecs() {
initResources();
- return this.resources.get(i);
+ return this.resourceSpecs;
}
public void setLocalizerAction(LocalizerAction action) {
@@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponseP
builder.setAction(convertToProtoFormat(action));
}
+ public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
+ maybeInitBuilder();
+ if (rsrcs == null) {
+ builder.clearResources();
+ return;
+ }
+ this.resourceSpecs = rsrcs;
+ }
+
private void initResources() {
- if (this.resources != null) {
+ if (this.resourceSpecs != null) {
return;
}
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<LocalResourceProto> list = p.getResourcesList();
- this.resources = new ArrayList<LocalResource>();
-
- for (LocalResourceProto c : list) {
- this.resources.add(convertFromProtoFormat(c));
+ List<ResourceLocalizationSpecProto> list = p.getResourcesList();
+ this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
+ for (ResourceLocalizationSpecProto c : list) {
+ this.resourceSpecs.add(convertFromProtoFormat(c));
}
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResources();
- if (this.resources == null)
+ if (this.resourceSpecs == null)
return;
- Iterable<LocalResourceProto> iterable =
- new Iterable<LocalResourceProto>() {
+ Iterable<ResourceLocalizationSpecProto> iterable =
+ new Iterable<ResourceLocalizationSpecProto>() {
@Override
- public Iterator<LocalResourceProto> iterator() {
- return new Iterator<LocalResourceProto>() {
+ public Iterator<ResourceLocalizationSpecProto> iterator() {
+ return new Iterator<ResourceLocalizationSpecProto>() {
- Iterator<LocalResource> iter = resources.iterator();
+ Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
@Override
public boolean hasNext() {
@@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponseP
}
@Override
- public LocalResourceProto next() {
- return convertToProtoFormat(iter.next());
+ public ResourceLocalizationSpecProto next() {
+ ResourceLocalizationSpec resource = iter.next();
+
+ return ((ResourceLocalizationSpecPBImpl)resource).getProto();
}
@Override
@@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponseP
builder.addAllResources(iterable);
}
- public void addAllResources(List<LocalResource> resources) {
- if (resources == null)
- return;
- initResources();
- this.resources.addAll(resources);
- }
- public void addResource(LocalResource resource) {
- initResources();
- this.resources.add(resource);
- }
-
- public void removeResource(int index) {
- initResources();
- this.resources.remove(index);
- }
-
- public void clearResources() {
- initResources();
- this.resources.clear();
- }
-
- private LocalResource convertFromProtoFormat(LocalResourceProto p) {
- return new LocalResourcePBImpl(p);
- }
-
- private LocalResourceProto convertToProtoFormat(LocalResource s) {
- return ((LocalResourcePBImpl)s).getProto();
+ private ResourceLocalizationSpec convertFromProtoFormat(
+ ResourceLocalizationSpecProto p) {
+ return new ResourceLocalizationSpecPBImpl(p);
}
private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponseP
private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
return LocalizerAction.valueOf(a.name());
}
-
-}
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Apr 9 01:33:38 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@@ -89,8 +91,6 @@ public class ContainerLocalizer {
private final String localizerId;
private final FileContext lfs;
private final Configuration conf;
- private final LocalDirAllocator appDirs;
- private final LocalDirAllocator userDirs;
private final RecordFactory recordFactory;
private final Map<LocalResource,Future<Path>> pendingResources;
private final String appCacheDirContextName;
@@ -112,8 +112,6 @@ public class ContainerLocalizer {
this.recordFactory = recordFactory;
this.conf = new Configuration();
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
- this.appDirs = new LocalDirAllocator(appCacheDirContextName);
- this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
}
@@ -197,10 +195,10 @@ public class ContainerLocalizer {
return new ExecutorCompletionService<Path>(exec);
}
- Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
+ Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
- Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
- return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+ DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+ return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
}
static long getEstimatedSize(LocalResource rsrc) {
@@ -238,25 +236,12 @@ public class ContainerLocalizer {
LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
switch (response.getLocalizerAction()) {
case LIVE:
- List<LocalResource> newResources = response.getAllResources();
- for (LocalResource r : newResources) {
- if (!pendingResources.containsKey(r)) {
- final LocalDirAllocator lda;
- switch (r.getVisibility()) {
- default:
- LOG.warn("Unknown visibility: " + r.getVisibility()
- + ", Using userDirs");
- //Falling back to userDirs for unknown visibility.
- case PUBLIC:
- case PRIVATE:
- lda = userDirs;
- break;
- case APPLICATION:
- lda = appDirs;
- break;
- }
- // TODO: Synchronization??
- pendingResources.put(r, cs.submit(download(lda, r, ugi)));
+ List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
+ for (ResourceLocalizationSpec newRsrc : newRsrcs) {
+ if (!pendingResources.containsKey(newRsrc.getResource())) {
+ pendingResources.put(newRsrc.getResource(), cs.submit(download(
+ new Path(newRsrc.getDestinationDirectory().getFile()),
+ newRsrc.getResource(), ugi)));
}
}
break;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Tue Apr 9 01:33:38 2013
@@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Apr 9 01:33:38 2013
@@ -80,10 +80,12 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -326,7 +329,7 @@ public class ResourceLocalizationService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
- dispatcher, false, super.getConfig()));
+ dispatcher, true, super.getConfig()));
if (null != appRsrc.putIfAbsent(
ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@@ -476,6 +479,21 @@ public class ResourceLocalizationService
}
}
+ private String getUserFileCachePath(String user) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+ return path;
+ }
+
+ private String getUserAppCachePath(String user, String appId) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+ + Path.SEPARATOR + appId;
+ return path;
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -803,7 +821,20 @@ public class ResourceLocalizationService
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
- response.addResource(next);
+ try {
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec rsrc =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(rsrc);
+ response.setResourceSpecs(rsrcs);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be found."
+ + "Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ // TODO fail? Already translated several times...
+ }
} else if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@@ -812,7 +843,8 @@ public class ResourceLocalizationService
}
return response;
}
-
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
@@ -835,6 +867,7 @@ public class ResourceLocalizationService
new ResourceLocalizedEvent(req,
ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
stat.getLocalSize()));
+ localizationCompleted(stat);
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
// TODO: Synchronization
@@ -844,7 +877,17 @@ public class ResourceLocalizationService
response.setLocalizerAction(LocalizerAction.LIVE);
LocalResource next = findNextResource();
if (next != null) {
- response.addResource(next);
+ try {
+ ResourceLocalizationSpec resource =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(resource);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be " +
+ "found. Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ //TODO fail? Already translated several times...
+ }
}
break;
case FETCH_PENDING:
@@ -854,6 +897,7 @@ public class ResourceLocalizationService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
+ localizationCompleted(stat);
// TODO: Why is this event going directly to the container. Why not
// the resource itself? What happens to the resource? Is it removed?
dispatcher.getEventHandler().handle(
@@ -869,9 +913,53 @@ public class ResourceLocalizationService
break;
}
}
+ response.setResourceSpecs(rsrcs);
return response;
}
+ private void localizationCompleted(LocalResourceStatus stat) {
+ try {
+ LocalResource rsrc = stat.getResource();
+ LocalResourceRequest key = new LocalResourceRequest(rsrc);
+ String user = context.getUser();
+ ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ LocalResourceVisibility vis = rsrc.getVisibility();
+ LocalResourcesTracker tracker =
+ getLocalResourcesTracker(vis, user, appId);
+ if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
+ tracker.localizationCompleted(key, true);
+ } else {
+ tracker.localizationCompleted(key, false);
+ }
+ } catch (URISyntaxException e) {
+ LOG.error("Invalid resource URL specified", e);
+ }
+ }
+
+ private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+ URISyntaxException {
+ String user = context.getUser();
+ ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
+ LocalResourceVisibility vis = rsrc.getVisibility();
+ LocalResourcesTracker tracker =
+ getLocalResourcesTracker(vis, user, appId);
+ String cacheDirectory = null;
+ if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+ cacheDirectory = getUserFileCachePath(user);
+ } else {// APPLICATION ONLY
+ cacheDirectory = getUserAppCachePath(user, appId.toString());
+ }
+ Path dirPath =
+ dirsHandler.getLocalPathForWrite(cacheDirectory,
+ ContainerLocalizer.getEstimatedSize(rsrc), false);
+ return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
+
+ }
+
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
@@ -1033,4 +1121,4 @@ public class ResourceLocalizationService
del.delete(null, dirPath, new Path[] {});
}
-}
\ No newline at end of file
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java Tue Apr 9 01:33:38 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class NodeManagerBuilderUtils {
+
+ public static ResourceLocalizationSpec newResourceLocalizationSpec(
+ LocalResource rsrc, Path path) {
+ URL local = ConverterUtils.getYarnUrlFromPath(path);
+ ResourceLocalizationSpec resourceLocalizationSpec =
+ Records.newRecord(ResourceLocalizationSpec.class);
+ resourceLocalizationSpec.setDestinationDirectory(local);
+ resourceLocalizationSpec.setResource(rsrc);
+ return resourceLocalizationSpec;
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Tue Apr 9 01:33:38 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
DIE = 2;
}
+message ResourceLocalizationSpecProto {
+ optional LocalResourceProto resource = 1;
+ optional URLProto destination_directory = 2;
+}
+
message LocalizerHeartbeatResponseProto {
optional LocalizerActionProto action = 1;
- repeated LocalResourceProto resources = 2;
+ repeated ResourceLocalizationSpecProto resources = 2;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Tue Apr 9 01:33:38 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.junit.Test;
-import static org.junit.Assert.*;
public class TestPBRecordImpl {
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
static LocalResource createResource() {
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
assertTrue(ret instanceof LocalResourcePBImpl);
- ret.setResource(
- ConverterUtils.getYarnUrlFromPath(
- new Path("hdfs://y.ak:8020/foo/bar")));
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+ "hdfs://y.ak:8020/foo/bar")));
ret.setSize(4344L);
ret.setTimestamp(3141592653589793L);
ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
return ret;
}
- static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+ static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse()
+ throws URISyntaxException {
LocalizerHeartbeatResponse ret =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
ret.setLocalizerAction(LocalizerAction.LIVE);
- ret.addResource(createResource());
+ LocalResource rsrc = createResource();
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec resource =
+ recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+ resource.setResource(rsrc);
+ resource.setDestinationDirectory(ConverterUtils
+ .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+ rsrcs.add(resource);
+ ret.setResourceSpecs(rsrcs);
+ System.out.println(resource);
return ret;
}
- @Test
+ @Test(timeout=10000)
public void testLocalResourceStatusSerDe() throws Exception {
LocalResourceStatus rsrcS = createLocalResourceStatus();
assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
assertEquals(createResource(), rsrcD.getResource());
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerStatusSerDe() throws Exception {
LocalizerStatus rsrcS = createLocalizerStatus();
assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerHeartbeatResponseSerDe() throws Exception {
LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
assertEquals(rsrcS, rsrcD);
- assertEquals(createResource(), rsrcS.getLocalResource(0));
- assertEquals(createResource(), rsrcD.getLocalResource(0));
+ assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+ assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java Tue Apr 9 01:33:38 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatRespo
implements LocalizerHeartbeatResponse {
LocalizerAction action;
- List<LocalResource> rsrc;
+ List<ResourceLocalizationSpec> resourceSpecs;
MockLocalizerHeartbeatResponse() {
- rsrc = new ArrayList<LocalResource>();
+ resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
}
MockLocalizerHeartbeatResponse(
- LocalizerAction action, List<LocalResource> rsrc) {
+ LocalizerAction action, List<ResourceLocalizationSpec> resources) {
this.action = action;
- this.rsrc = rsrc;
+ this.resourceSpecs = resources;
}
public LocalizerAction getLocalizerAction() { return action; }
- public List<LocalResource> getAllResources() { return rsrc; }
- public LocalResource getLocalResource(int i) { return rsrc.get(i); }
public void setLocalizerAction(LocalizerAction action) {
this.action = action;
}
- public void addAllResources(List<LocalResource> resources) {
- rsrc.addAll(resources);
+
+ @Override
+ public List<ResourceLocalizationSpec> getResourceSpecs() {
+ return resourceSpecs;
+}
+
+ @Override
+ public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
+ this.resourceSpecs = resourceSpecs;
}
- public void addResource(LocalResource resource) { rsrc.add(resource); }
- public void removeResource(int index) { rsrc.remove(index); }
- public void clearResources() { rsrc.clear(); }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Tue Apr 9 01:33:38 2013
@@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
@@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -95,12 +96,33 @@ public class TestContainerLocalizer {
public void testContainerLocalizerMain() throws Exception {
ContainerLocalizer localizer = setupContainerLocalizerForTest();
+ // verify created cache
+ List<Path> privCacheList = new ArrayList<Path>();
+ List<Path> appCacheList = new ArrayList<Path>();
+ for (Path p : localDirs) {
+ Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
+ Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+ privCacheList.add(privcache);
+ Path appDir =
+ new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+ Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+ appCacheList.add(appcache);
+ }
+
// mock heartbeat responses from NM
- LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcC = getMockRsrc(random,
- LocalResourceVisibility.APPLICATION);
- LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+ ResourceLocalizationSpec rsrcA =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+ ResourceLocalizationSpec rsrcB =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+ ResourceLocalizationSpec rsrcC =
+ getMockRsrc(random, LocalResourceVisibility.APPLICATION,
+ appCacheList.get(0));
+ ResourceLocalizationSpec rsrcD =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcA)))
@@ -111,27 +133,33 @@ public class TestContainerLocalizer {
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcD)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
- Collections.<LocalResource>emptyList()))
+ Collections.<ResourceLocalizationSpec>emptyList()))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
null));
- doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
+ LocalResource tRsrcA = rsrcA.getResource();
+ LocalResource tRsrcB = rsrcB.getResource();
+ LocalResource tRsrcC = rsrcC.getResource();
+ LocalResource tRsrcD = rsrcD.getResource();
+ doReturn(
+ new FakeDownload(rsrcA.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcA),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
+ doReturn(
+ new FakeDownload(rsrcB.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcB),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
+ doReturn(
+ new FakeDownload(rsrcC.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcC),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
+ doReturn(
+ new FakeDownload(rsrcD.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcD),
isA(UserGroupInformation.class));
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
-
- // verify created cache
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@@ -143,15 +171,14 @@ public class TestContainerLocalizer {
Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
}
-
// verify tokens read at expected location
verify(spylfs).open(tokenPath);
// verify downloaded resources reported to NM
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource())));
// verify all HB use localizerID provided
verify(nmProxy, never()).heartbeat(argThat(
@@ -306,10 +333,12 @@ public class TestContainerLocalizer {
return mockRF;
}
- static LocalResource getMockRsrc(Random r,
- LocalResourceVisibility vis) {
- LocalResource rsrc = mock(LocalResource.class);
+ static ResourceLocalizationSpec getMockRsrc(Random r,
+ LocalResourceVisibility vis, Path p) {
+ ResourceLocalizationSpec resourceLocalizationSpec =
+ mock(ResourceLocalizationSpec.class);
+ LocalResource rsrc = mock(LocalResource.class);
String name = Long.toHexString(r.nextLong());
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
@@ -322,7 +351,10 @@ public class TestContainerLocalizer {
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(vis);
- return rsrc;
+ when(resourceLocalizationSpec.getResource()).thenReturn(rsrc);
+ when(resourceLocalizationSpec.getDestinationDirectory()).
+ thenReturn(ConverterUtils.getYarnUrlFromPath(p));
+ return resourceLocalizationSpec;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Apr 9 01:33:38 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -375,7 +377,7 @@ public class TestResourceLocalizationSer
}
}
- @Test
+ @Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new YarnConfiguration();
@@ -386,12 +388,17 @@ public class TestResourceLocalizationSer
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
- String[] sDirs = new String[4];
- for (int i = 0; i < 4; ++i) {
- localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
- sDirs[i] = localDirs.get(i).toString();
- }
+ String[] sDirs = new String[1];
+ // Making sure that we have only one local disk so that it will only be
+ // selected for consecutive resource localization calls. This is required
+ // to test LocalCacheDirectoryManager.
+ localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+ sDirs[0] = localDirs.get(0).toString();
+
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ // Adding configuration to make sure there is only one file per
+ // directory
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
@@ -452,12 +459,23 @@ public class TestResourceLocalizationSer
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
- final LocalResource resource = getPrivateMockedResource(r);
- final LocalResourceRequest req = new LocalResourceRequest(resource);
+ final LocalResource resource1 = getPrivateMockedResource(r);
+ LocalResource resource2 = null;
+ do {
+ resource2 = getPrivateMockedResource(r);
+ } while (resource2 == null || resource2.equals(resource1));
+ // above call to make sure we don't get identical resources.
+
+ final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+ final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
- rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+ List<LocalResourceRequest> privateResourceList =
+ new ArrayList<LocalResourceRequest>();
+ privateResourceList.add(req1);
+ privateResourceList.add(req2);
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(1000);
@@ -471,33 +489,64 @@ public class TestResourceLocalizationSer
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
- LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr);
- when(rsrcStat.getResource()).thenReturn(resource);
- when(rsrcStat.getLocalSize()).thenReturn(4344L);
+ when(rsrcStat1.getResource()).thenReturn(resource1);
+ when(rsrcStat2.getResource()).thenReturn(resource2);
+ when(rsrcStat1.getLocalSize()).thenReturn(4344L);
+ when(rsrcStat2.getLocalSize()).thenReturn(2342L);
URL locPath = getPath("/cache/private/blah");
- when(rsrcStat.getLocalPath()).thenReturn(locPath);
- when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrcStat1.getLocalPath()).thenReturn(locPath);
+ when(rsrcStat2.getLocalPath()).thenReturn(locPath);
+ when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList())
- .thenReturn(Collections.singletonList(rsrcStat))
+ .thenReturn(Collections.singletonList(rsrcStat1))
+ .thenReturn(Collections.singletonList(rsrcStat2))
.thenReturn(Collections.<LocalResourceStatus>emptyList());
- // get rsrc
+ String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
+ Path.SEPARATOR + "user0" + Path.SEPARATOR +
+ ContainerLocalizer.FILECACHE;
+
+ // get first resource
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
- assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+ assertEquals(1, response.getResourceSpecs().size());
+ assertEquals(req1,
+ new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
+ URL localizedPath =
+ response.getResourceSpecs().get(0).getDestinationDirectory();
+ assertTrue(localizedPath.getFile().endsWith(localPath));
+
+ // get second resource
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ assertEquals(1, response.getResourceSpecs().size());
+ assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
+ .get(0).getResource()));
+ localizedPath =
+ response.getResourceSpecs().get(0).getDestinationDirectory();
+ // Resource's destination path should be now inside sub directory 0 as
+ // LocalCacheDirectoryManager will be used and we have restricted number
+ // of files per directory to 1.
+ assertTrue(localizedPath.getFile().endsWith(
+ localPath + Path.SEPARATOR + "0"));
// empty rsrc
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
- assertEquals(0, response.getAllResources().size());
+ assertEquals(0, response.getResourceSpecs().size());
// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+ dispatcher.await();
// verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc =
new ArgumentMatcher<ContainerEvent>() {
@@ -508,9 +557,9 @@ public class TestResourceLocalizationSer
&& c.getContainerID() == evt.getContainerID();
}
};
- dispatcher.await();
- verify(containerBus).handle(argThat(matchesContainerLoc));
-
+ // total 2 resource localzation calls. one for each resource.
+ verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {