You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/27 19:27:51 UTC

svn commit: r1224971 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yarn/had...

Author: vinodkv
Date: Tue Dec 27 18:27:51 2011
New Revision: 1224971

URL: http://svn.apache.org/viewvc?rev=1224971&view=rev
Log:
MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM immediately after downloading a resource instead of always waiting for a second. Contributed by Siddarth Seth.
svn merge -c 1224970 --ignore-ancestry ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1224971&r1=1224970&r2=1224971&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Dec 27 18:27:51 2011
@@ -92,6 +92,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
     Vavilapalli via sseth)
 
+    MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM
+    immediately after downloading a resource instead of always waiting for a
+    second. (Siddarth Seth via vinodkv)
+
   BUG FIXES
     MAPREDUCE-2950. [Rumen] Fixed TestUserResolve. (Ravi Gummadi via amarrk)
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1224971&r1=1224970&r2=1224971&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Dec 27 18:27:51 2011
@@ -31,7 +31,9 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -163,7 +165,8 @@ public class ContainerLocalizer {
     ExecutorService exec = null;
     try {
       exec = createDownloadThreadPool();
-      localizeFiles(nodeManager, exec, ugi);
+      CompletionService<Path> ecs = createCompletionService(exec);
+      localizeFiles(nodeManager, ecs, ugi);
       return 0;
     } catch (Throwable e) {
       // Print traces to stdout so that they can be logged by the NM address
@@ -182,6 +185,10 @@ public class ContainerLocalizer {
       .setNameFormat("ContainerLocalizer Downloader").build());
   }
 
+  CompletionService<Path> createCompletionService(ExecutorService exec) {
+    return new ExecutorCompletionService<Path>(exec);
+  }
+
   Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
     Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
@@ -206,7 +213,8 @@ public class ContainerLocalizer {
   }
 
   private void localizeFiles(LocalizationProtocol nodemanager,
-      ExecutorService exec, UserGroupInformation ugi) throws IOException {
+      CompletionService<Path> cs, UserGroupInformation ugi)
+      throws IOException {
     while (true) {
       try {
         LocalizerStatus status = createStatus();
@@ -231,7 +239,7 @@ public class ContainerLocalizer {
                 break;
               }
               // TODO: Synchronization??
-              pendingResources.put(r, exec.submit(download(lda, r, ugi)));
+              pendingResources.put(r, cs.submit(download(lda, r, ugi)));
             }
           }
           break;
@@ -247,8 +255,7 @@ public class ContainerLocalizer {
           } catch (YarnRemoteException e) { }
           return;
         }
-        // TODO HB immediately when rsrc localized
-        sleep(1);
+        cs.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         return;
       } catch (YarnRemoteException e) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1224971&r1=1224970&r2=1224971&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Tue Dec 27 18:27:51 2011
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -146,7 +147,8 @@ public class TestContainerLocalizer {
 
     // return result instantly for deterministic test
     ExecutorService syncExec = mock(ExecutorService.class);
-    when(syncExec.submit(isA(Callable.class)))
+    CompletionService<Path> cs = mock(CompletionService.class);
+    when(cs.submit(isA(Callable.class)))
       .thenAnswer(new Answer<Future<Path>>() {
           @Override
           public Future<Path> answer(InvocationOnMock invoc)
@@ -159,6 +161,7 @@ public class TestContainerLocalizer {
           }
         });
     doReturn(syncExec).when(localizer).createDownloadThreadPool();
+    doReturn(cs).when(localizer).createCompletionService(syncExec);
 
     // run localization
     assertEquals(0, localizer.runLocalization(nmAddr));