You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/31 00:56:15 UTC
svn commit: r1367352 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yarn/hadoop-yarn-server...
Author: tucu
Date: Mon Jul 30 22:56:15 2012
New Revision: 1367352
URL: http://svn.apache.org/viewvc?rev=1367352&view=rev
Log:
MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files get deleted from tasktracker. (mayank_bansal via tucu)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jul 30 22:56:15 2012
@@ -156,6 +156,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4465. Update description of yarn.nodemanager.address property.
(bowang via tucu)
+ MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files
+ get deleted from tasktracker. (mayank_bansal via tucu)
+
Release 2.1.0-alpha - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Jul 30 22:56:15 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
/**
* A collection of {@link LocalizedResource}s all of same
@@ -67,6 +69,12 @@ class LocalResourcesTrackerImpl implemen
switch (event.getType()) {
case REQUEST:
case LOCALIZED:
+ if (rsrc != null && (!isResourcePresent(rsrc))) {
+ LOG.info("Resource " + rsrc.getLocalPath()
+ + " is missing, localizing it again");
+ localrsrc.remove(req);
+ rsrc = null;
+ }
if (null == rsrc) {
rsrc = new LocalizedResource(req, dispatcher);
localrsrc.put(req, rsrc);
@@ -82,6 +90,24 @@ class LocalResourcesTrackerImpl implemen
rsrc.handle(event);
}
+ /**
+ * This module checks if the resource which was localized is already present
+ * or not
+ *
+ * @param rsrc
+ * @return true/false based on resource is present or not
+ */
+ public boolean isResourcePresent(LocalizedResource rsrc) {
+ boolean ret = true;
+ if (rsrc.getState() == ResourceState.LOCALIZED) {
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ if (!file.exists()) {
+ ret = false;
+ }
+ }
+ return ret;
+ }
+
@Override
public boolean contains(LocalResourceRequest resource) {
return localrsrc.containsKey(resource);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Mon Jul 30 22:56:15 2012
@@ -5,6 +5,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
+import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl {
@@ -131,6 +134,86 @@ public class TestLocalResourcesTrackerIm
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testConsistency() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ dispatcher = createDispatcher(new Configuration());
+ EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+ LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ localrsrc.put(req1, lr1);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc);
+
+ ResourceEvent req11Event = new ResourceRequestEvent(req1,
+ LocalResourceVisibility.PUBLIC, lc1);
+
+ ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
+
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+
+ dispatcher.await();
+
+ // Verify refCount for R1 is 1
+ Assert.assertEquals(1, lr1.getRefCount());
+
+ dispatcher.await();
+ verifyTrackedResourceCount(tracker, 1);
+
+ // Localize resource1
+ ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
+ "file:///tmp/r1"), 1);
+ lr1.handle(rle);
+ Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+ Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
+ LocalizedResource rsrcbefore = tracker.iterator().next();
+ File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
+ .toString());
+ Assert.assertTrue(resFile.exists());
+ Assert.assertTrue(resFile.delete());
+
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+
+ dispatcher.await();
+ lr1.handle(rle);
+ Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+ LocalizedResource rsrcafter = tracker.iterator().next();
+ if (rsrcbefore == rsrcafter) {
+ Assert.fail("Localized resource should not be equal");
+ }
+ // Release resource1
+ tracker.handle(rel11Event);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
+ private boolean createdummylocalizefile(Path path) {
+ boolean ret = false;
+ File file = new File(path.toUri().getRawPath().toString());
+ try {
+ ret = file.createNewFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return ret;
+ }
+
private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
int expected) {
int count = 0;