You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by jl...@apache.org on 2013/08/13 22:21:11 UTC
svn commit: r1513636 - in
/hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yarn/hadoop-y...
Author: jlowe
Date: Tue Aug 13 20:21:10 2013
New Revision: 1513636
URL: http://svn.apache.org/r1513636
Log:
YARN-1036. Distributed Cache gives inconsistent result if cache files get deleted from task tracker. Contributed by Mayank Bansal and Ravi Prakash
Modified:
hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Tue Aug 13 20:21:10 2013
@@ -14,6 +14,9 @@ Release 0.23.10 - UNRELEASED
YARN-985. Nodemanager should log where a resource was localized (Ravi
Prakash via jeagles)
+ YARN-1036. Distributed Cache gives inconsistent result if cache files get
+ deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Aug 13 20:21:10 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -65,8 +66,14 @@ class LocalResourcesTrackerImpl implemen
LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
+ case LOCALIZED: break;
case REQUEST:
- case LOCALIZED:
+ if (rsrc != null && (!isResourcePresent(rsrc))) {
+ LOG.info("Resource " + rsrc.getLocalPath()
+ + " is missing, localizing it again");
+ localrsrc.remove(req);
+ rsrc = null;
+ }
if (null == rsrc) {
rsrc = new LocalizedResource(req, dispatcher);
localrsrc.put(req, rsrc);
@@ -82,6 +89,24 @@ class LocalResourcesTrackerImpl implemen
rsrc.handle(event);
}
+ /**
+ * This module checks if the resource which was localized is already present
+ * or not
+ *
+ * @param rsrc
+ * @return true/false based on resource is present or not
+ */
+ public boolean isResourcePresent(LocalizedResource rsrc) {
+ boolean ret = true;
+ if (rsrc.getState() == ResourceState.LOCALIZED) {
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ if (!file.exists()) {
+ ret = false;
+ }
+ }
+ return ret;
+ }
+
@Override
public boolean contains(LocalResourceRequest resource) {
return localrsrc.containsKey(resource);
Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Tue Aug 13 20:21:10 2013
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -149,6 +151,86 @@ public class TestLocalResourcesTrackerIm
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testConsistency() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ dispatcher = createDispatcher(new Configuration());
+ EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+ LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ localrsrc.put(req1, lr1);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc);
+
+ ResourceEvent req11Event = new ResourceRequestEvent(req1,
+ LocalResourceVisibility.PUBLIC, lc1);
+
+ ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
+
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+
+ dispatcher.await();
+
+ // Verify refCount for R1 is 1
+ Assert.assertEquals(1, lr1.getRefCount());
+
+ dispatcher.await();
+ verifyTrackedResourceCount(tracker, 1);
+
+ // Localize resource1
+ ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
+ "file:///tmp/r1"), 1);
+ lr1.handle(rle);
+ Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+ Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
+ LocalizedResource rsrcbefore = tracker.iterator().next();
+ File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
+ .toString());
+ Assert.assertTrue(resFile.exists());
+ Assert.assertTrue(resFile.delete());
+
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+
+ dispatcher.await();
+ lr1.handle(rle);
+ Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+ LocalizedResource rsrcafter = tracker.iterator().next();
+ if (rsrcbefore == rsrcafter) {
+ Assert.fail("Localized resource should not be equal");
+ }
+ // Release resource1
+ tracker.handle(rel11Event);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
+ private boolean createdummylocalizefile(Path path) {
+ boolean ret = false;
+ File file = new File(path.toUri().getRawPath().toString());
+ try {
+ ret = file.createNewFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return ret;
+ }
+
private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
int expected) {
int count = 0;