You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:28 UTC

[11/50] [abbrv] hadoop git commit: YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)

YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47279c32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47279c32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47279c32

Branch: refs/heads/HDFS-7240
Commit: 47279c3228185548ed09c36579b420225e4894f5
Parents: 22b70e7
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sun Apr 26 09:13:46 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sun Apr 26 09:13:46 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../container/ContainerImpl.java                |  8 ++-
 .../localizer/ResourceLocalizationService.java  | 53 +++++++++++++++-----
 .../localizer/event/LocalizationEventType.java  |  1 +
 .../TestResourceLocalizationService.java        | 12 ++++-
 5 files changed, 62 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a626f82..87db291 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -268,6 +268,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore
     invoked (Brahma Reddy Battula via jlowe)
 
+    YARN-3464. Race condition in LocalizerRunner kills localizer before 
+    localizing all resources. (Zhihai Xu via kasha)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index c9874a6..68669aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -716,7 +718,12 @@ public class ContainerImpl implements Container {
         return ContainerState.LOCALIZING;
       }
 
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizationEvent(LocalizationEventType.
+              CONTAINER_RESOURCES_LOCALIZED, container));
+
       container.sendLaunchEvent();
+      container.metrics.endInitingContainer();
 
       // If this is a recovered container that has already launched, skip
       // uploading resources to the shared cache. We do this to avoid uploading
@@ -734,7 +741,6 @@ public class ContainerImpl implements Container {
                 SharedCacheUploadEventType.UPLOAD));
       }
 
-      container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 611fe80..cdd252c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -389,6 +391,9 @@ public class ResourceLocalizationService extends CompositeService
     case INIT_CONTAINER_RESOURCES:
       handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
+    case CONTAINER_RESOURCES_LOCALIZED:
+      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
+      break;
     case CACHE_CLEANUP:
       handleCacheCleanup(event);
       break;
@@ -455,7 +460,18 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
   }
-  
+
+  /**
+   * Once a container's resources are localized, kill the corresponding
+   * {@link ContainerLocalizer}
+   */
+  private void handleContainerResourcesLocalized(
+      ContainerLocalizationEvent event) {
+    Container c = event.getContainer();
+    String locId = ConverterUtils.toString(c.getContainerId());
+    localizerTracker.endContainerLocalization(locId);
+  }
+
   private void handleCacheCleanup(LocalizationEvent event) {
     ResourceRetentionSet retain =
       new ResourceRetentionSet(delService, cacheTargetSize);
@@ -670,7 +686,7 @@ public class ResourceLocalizationService extends CompositeService
           response.setLocalizerAction(LocalizerAction.DIE);
           return response;
         }
-        return localizer.update(status.getResources());
+        return localizer.processHeartbeat(status.getResources());
       }
     }
     
@@ -724,6 +740,17 @@ public class ResourceLocalizationService extends CompositeService
         localizer.interrupt();
       }
     }
+
+    public void endContainerLocalization(String locId) {
+      LocalizerRunner localizer;
+      synchronized (privLocalizers) {
+        localizer = privLocalizers.get(locId);
+        if (null == localizer) {
+          return; // ignore
+        }
+      }
+      localizer.endContainerLocalization();
+    }
   }
   
 
@@ -878,6 +905,7 @@ public class ResourceLocalizationService extends CompositeService
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
+    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
 
     // TODO: threadsafe, use outer?
     private final RecordFactory recordFactory =
@@ -898,6 +926,10 @@ public class ResourceLocalizationService extends CompositeService
       pending.add(request);
     }
 
+    public void endContainerLocalization() {
+      killContainerLocalizer.set(true);
+    }
+
     /**
      * Find next resource to be given to a spawned localizer.
      * 
@@ -944,7 +976,7 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
 
-    LocalizerHeartbeatResponse update(
+    LocalizerHeartbeatResponse processHeartbeat(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@@ -953,7 +985,7 @@ public class ResourceLocalizationService extends CompositeService
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
 
-      LocalizerAction action = LocalizerAction.LIVE;
+      boolean fetchFailed = false;
       // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
@@ -989,7 +1021,7 @@ public class ResourceLocalizationService extends CompositeService
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
             LOG.warn(req + " failed: " + diagnostics);
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, diagnostics));
@@ -1001,15 +1033,15 @@ public class ResourceLocalizationService extends CompositeService
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
-      if (action == LocalizerAction.DIE) {
-        response.setLocalizerAction(action);
+      if (fetchFailed || killContainerLocalizer.get()) {
+        response.setLocalizerAction(LocalizerAction.DIE);
         return response;
       }
 
@@ -1037,12 +1069,9 @@ public class ResourceLocalizationService extends CompositeService
         } catch (URISyntaxException e) {
             //TODO fail? Already translated several times...
         }
-      } else if (pending.isEmpty()) {
-        // TODO: Synchronization
-        action = LocalizerAction.DIE;
       }
 
-      response.setLocalizerAction(action);
+      response.setLocalizerAction(LocalizerAction.LIVE);
       response.setResourceSpecs(rsrcs);
       return response;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 5134349..4785fba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -23,4 +23,5 @@ public enum LocalizationEventType {
   CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,
+  CONTAINER_RESOURCES_LOCALIZED,
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index d3c3521..2edaf45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -975,7 +976,8 @@ public class TestResourceLocalizationService {
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
         .thenReturn(Collections.singletonList(rsrc1success))
         .thenReturn(Collections.singletonList(rsrc2pending))
-        .thenReturn(rsrcs4);
+        .thenReturn(rsrcs4)
+        .thenReturn(Collections.<LocalResourceStatus>emptyList());
 
       String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
           Path.SEPARATOR + "user0" + Path.SEPARATOR +
@@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
           localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
-      // get shutdown
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      spyService.handle(new ContainerLocalizationEvent(
+          LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
+
+      // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());