You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by jl...@apache.org on 2013/08/13 22:21:11 UTC

svn commit: r1513636 - in /hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yarn/hadoop-y...

Author: jlowe
Date: Tue Aug 13 20:21:10 2013
New Revision: 1513636

URL: http://svn.apache.org/r1513636
Log:
YARN-1036. Distributed Cache gives inconsistent result if cache files get deleted from task tracker. Contributed by Mayank Bansal and Ravi Prakash

Modified:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Tue Aug 13 20:21:10 2013
@@ -14,6 +14,9 @@ Release 0.23.10 - UNRELEASED
     YARN-985. Nodemanager should log where a resource was localized (Ravi
     Prakash via jeagles)
 
+    YARN-1036. Distributed Cache gives inconsistent result if cache files get
+    deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Aug 13 20:21:10 2013
@@ -17,6 +17,7 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -65,8 +66,14 @@ class LocalResourcesTrackerImpl implemen
     LocalResourceRequest req = event.getLocalResourceRequest();
     LocalizedResource rsrc = localrsrc.get(req);
     switch (event.getType()) {
+    case LOCALIZED: break;
     case REQUEST:
-    case LOCALIZED:
+      if (rsrc != null && (!isResourcePresent(rsrc))) {
+        LOG.info("Resource " + rsrc.getLocalPath()
+            + " is missing, localizing it again");
+        localrsrc.remove(req);
+        rsrc = null;
+      }
       if (null == rsrc) {
         rsrc = new LocalizedResource(req, dispatcher);
         localrsrc.put(req, rsrc);
@@ -82,6 +89,24 @@ class LocalResourcesTrackerImpl implemen
     rsrc.handle(event);
   }
 
+  /**
+   * This module checks if the resource which was localized is already present
+   * or not
+   * 
+   * @param rsrc
+   * @return true/false based on resource is present or not
+   */
+  public boolean isResourcePresent(LocalizedResource rsrc) {
+    boolean ret = true;
+    if (rsrc.getState() == ResourceState.LOCALIZED) {
+      File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+      if (!file.exists()) {
+        ret = false;
+      }
+    }
+    return ret;
+  }
+  
   @Override
   public boolean contains(LocalResourceRequest resource) {
     return localrsrc.containsKey(resource);

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1513636&r1=1513635&r2=1513636&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Tue Aug 13 20:21:10 2013
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -149,6 +151,86 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testConsistency() {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      dispatcher = createDispatcher(new Configuration());
+      EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
+      EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+      LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      localrsrc.put(req1, lr1);
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          dispatcher, localrsrc);
+
+      ResourceEvent req11Event = new ResourceRequestEvent(req1,
+          LocalResourceVisibility.PUBLIC, lc1);
+
+      ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+
+      // Verify refCount for R1 is 1
+      Assert.assertEquals(1, lr1.getRefCount());
+
+      dispatcher.await();
+      verifyTrackedResourceCount(tracker, 1);
+
+      // Localize resource1
+      ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
+          "file:///tmp/r1"), 1);
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
+      LocalizedResource rsrcbefore = tracker.iterator().next();
+      File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
+          .toString());
+      Assert.assertTrue(resFile.exists());
+      Assert.assertTrue(resFile.delete());
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      LocalizedResource rsrcafter = tracker.iterator().next();
+      if (rsrcbefore == rsrcafter) {
+        Assert.fail("Localized resource should not be equal");
+      }
+      // Release resource1
+      tracker.handle(rel11Event);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  private boolean createdummylocalizefile(Path path) {
+    boolean ret = false;
+    File file = new File(path.toUri().getRawPath().toString());
+    try {
+      ret = file.createNewFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return ret;
+  }
+  
   private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
       int expected) {
     int count = 0;