You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/10 20:08:30 UTC

git commit: TEZ-548. Fix a bug in BroadcastShuffleManager which was causing it to hang if an error occurred in the Scheduler thread. Also fixed the error in the thread. (sseth)

Updated Branches:
  refs/heads/master ad5666a9d -> 2f207c0d8


TEZ-548. Fix a bug in BroadcastShuffleManager which was causing it to
hang if an error occurred in the Scheduler thread. Also fixed the error
in the thread. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2f207c0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2f207c0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2f207c0d

Branch: refs/heads/master
Commit: 2f207c0d8cf4117dd30afa315374d5286a8abe46
Parents: ad5666a
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Oct 10 11:07:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Oct 10 11:07:42 2013 -0700

----------------------------------------------------------------------
 .../BroadcastShuffleInputEventHandler.java      |  2 +-
 .../input/BroadcastShuffleManager.java          | 60 +++++++++++++++-----
 .../shuffle/impl/ShuffleInputEventHandler.java  |  3 +-
 .../library/shuffle/common/InputHost.java       | 24 ++++----
 4 files changed, 61 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index cd50ec6..5011140 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -72,7 +72,7 @@ public class BroadcastShuffleInputEventHandler {
     } catch (InvalidProtocolBufferException e) {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
-    LOG.info("Processing data moveement event with srcIndex: "
+    LOG.info("Processing DataMovementEvent with srcIndex: "
         + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
         + ", attemptNum: " + dme.getVersion() + ", payload: "
         + TextFormat.shortDebugString(shufflePayload));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 1f0f590..05b9761 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -89,6 +88,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
   private final ExecutorService fetcherRawExecutor;
   private final ListeningExecutorService fetcherExecutor;
 
+  private final ExecutorService schedulerRawExecutor;
+  private final ListeningExecutorService schedulerExecutor;
+  
   private final BlockingQueue<FetchedInput> completedInputs;
   private final Set<InputIdentifier> completedInputSet;
   private final ConcurrentMap<String, InputHost> knownSrcHosts;
@@ -99,9 +101,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
   
   private final long startTime;
   private long lastProgressTime;
-  
-  private FutureTask<Void> runShuffleFuture;
-  
+
   // Required to be held when manipulating pendingHosts
   private ReentrantLock lock = new ReentrantLock();
   private Condition wakeLoop = lock.newCondition();
@@ -143,11 +143,24 @@ public class BroadcastShuffleManager implements FetcherCallback {
     
     this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
     
-    this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+    this.fetcherRawExecutor = Executors.newFixedThreadPool(
+        numFetchers,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
             .build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
+    this.schedulerRawExecutor = Executors.newFixedThreadPool(
+        1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+            .build());
+    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+    
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
     
@@ -178,10 +191,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
   
   public void run() {
     RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
-    runShuffleFuture = new FutureTask<Void>(callable);
-    Thread runThread = new Thread(runShuffleFuture, "ShuffleRunner");
-    runThread.setDaemon(true);
-    runThread.start();
+    ListenableFuture<Void> runShuffleFuture = schedulerExecutor
+        .submit(callable);
+    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+    // Shutdown this executor once this task, and the callback complete.
+    schedulerExecutor.shutdown();
   }
   
   private class RunBroadcastShuffleCallable implements Callable<Void> {
@@ -209,22 +223,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
         if (numCompletedInputs.get() < numInputs) {
           lock.lock();
           try {
-            int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers
-                - numRunningFetchers.get());
+            int maxFetchersToRun = numFetchers - numRunningFetchers.get();
             int count = 0;
             while (pendingHosts.peek() != null) {
               InputHost inputHost = pendingHosts.take();
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing pending host: " + inputHost);
+                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
               }
               if (inputHost.getNumPendingInputs() > 0) {
-                LOG.info("Scheduling fetch for inputHost: " + inputHost);
+                LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
                 Fetcher fetcher = constructFetcherForHost(inputHost);
                 numRunningFetchers.incrementAndGet();
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);
                 Futures.addCallback(future, fetchFutureCallback);
-                if (++count >= numFetchersToRun) {
+                if (++count >= maxFetchersToRun) {
                   break;
                 }
               } else {
@@ -276,6 +289,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
     // fetcher, especially in the case where #hosts < #fetchers
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
         pendingInputsForHost);
+    LOG.info("Created Fetcher for host: " + inputHost.getHost()
+        + ", with inputs: " + pendingInputsForHost);
     return fetcherBuilder.build();
   }
   
@@ -502,6 +517,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
   }
   
   
+  private class SchedulerFutureCallback implements FutureCallback<Void> {
+
+    @Override
+    public void onSuccess(Void result) {
+      LOG.info("Scheduler thread completed");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error("Scheduler failed with error: ", t);
+      inputContext.fatalError(t, "Broadcast Scheduler Failed");
+    }
+    
+  }
+  
   private class FetchFutureCallback implements FutureCallback<FetchResult> {
 
     private void doBookKeepingForFetcherComplete() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index d731a46..8fae1c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -74,9 +74,8 @@ public class ShuffleInputEventHandler {
     } 
     int partitionId = dmEvent.getSourceIndex();
     URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-    LOG.info("Data movement event baseUri:" + baseUri);
-
     InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+    LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
     scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
     
     // TODO NEWTEZ See if this duration hack can be removed.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 4759a8b..7905e27 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -19,8 +19,9 @@
 package org.apache.tez.runtime.library.shuffle.common;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -30,7 +31,7 @@ public class InputHost {
   private final String host;
   private final int port;
 
-  private final List<InputAttemptIdentifier> inputs = new LinkedList<InputAttemptIdentifier>();
+  private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
 
   public InputHost(String hostName, int port, ApplicationId appId) {
     this.host = hostName;
@@ -45,18 +46,18 @@ public class InputHost {
     return this.port;
   }
 
-  public synchronized int getNumPendingInputs() {
+  public int getNumPendingInputs() {
     return inputs.size();
   }
   
-  public synchronized void addKnownInput(InputAttemptIdentifier srcAttempt) {
+  public void addKnownInput(InputAttemptIdentifier srcAttempt) {
     inputs.add(srcAttempt);
   }
 
-  public synchronized List<InputAttemptIdentifier> clearAndGetPendingInputs() {
+  public List<InputAttemptIdentifier> clearAndGetPendingInputs() {
     List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>(
-        inputs);
-    inputs.clear();
+        inputs.size());
+    inputs.drainTo(inputsCopy);
     return inputsCopy;
   }
 
@@ -88,10 +89,13 @@ public class InputHost {
     return true;
   }
 
-  @Override
-  public String toString() {
+  public String toDetailedString() {
     return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
         + "]";
   }
-
+  
+  @Override
+  public String toString() {
+    return "InputHost [host=" + host + ", port=" + port + "]";
+  }
 }