You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/08 08:44:13 UTC

git commit: TEZ-536. Fix a couple of bugs in the BroadcastShuffleManager which was causing it to not fetch inputs at times. (sseth)

Updated Branches:
  refs/heads/master 4dd5e195d -> a181fba3f


TEZ-536. Fix a couple of bugs in the BroadcastShuffleManager which was
causing it to not fetch inputs at times. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a181fba3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a181fba3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a181fba3

Branch: refs/heads/master
Commit: a181fba3f72117d4658751fce79ed9db920c6343
Parents: 4dd5e19
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 7 23:43:29 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 7 23:43:29 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/runtime/api/impl/TaskSpec.java   |  2 +-
 .../BroadcastShuffleInputEventHandler.java      |  3 +-
 .../input/BroadcastShuffleManager.java          | 85 ++++++++++++--------
 .../broadcast/output/FileBasedKVWriter.java     |  2 +
 4 files changed, 55 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 6e0995a..534de99 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -128,7 +128,7 @@ public class TaskSpec implements Writable {
   public String toString() {
     StringBuffer sb = new StringBuffer();
     sb.append("TaskAttemptID:" + taskAttemptId);
-    sb.append("processorName=" + processorDescriptor.getClassName()
+    sb.append(", processorName=" + processorDescriptor.getClassName()
         + ", inputSpecListSize=" + inputSpecList.size()
         + ", outputSpecListSize=" + outputSpecList.size());
     sb.append(", inputSpecList=[");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index 38f6e6c..cd50ec6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -29,7 +29,6 @@ import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandler;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
 import com.google.common.base.Preconditions;
@@ -38,7 +37,7 @@ import com.google.protobuf.TextFormat;
 
 public class BroadcastShuffleInputEventHandler {
 
-  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+  private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
   
   private final BroadcastShuffleManager shuffleManager;
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index fd47757..82c7f37 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -90,9 +91,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
   private final BlockingQueue<FetchedInput> completedInputs;
   private final Set<InputIdentifier> completedInputSet;
-//  private final Set<InputIdentifier> pendingInputs;
   private final ConcurrentMap<String, InputHost> knownSrcHosts;
-  private final Set<InputHost> pendingHosts;
+  private final BlockingQueue<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
   
   private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
@@ -133,7 +133,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
     knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
-    pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+    pendingHosts = new LinkedBlockingQueue<InputHost>();
     obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
     
     int maxConfiguredFetchers = 
@@ -196,41 +196,50 @@ public class BroadcastShuffleManager implements FetcherCallback {
           } finally {
             lock.unlock();
           }
-          if (shuffleError != null) {
-            // InputContext has already been informed of a fatal error.
-            // Initiate shutdown.
-            break;
-          }
-          
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("NumCompletedInputs: " + numCompletedInputs);
-          }
-          if (numCompletedInputs.get() < numInputs) {
-            lock.lock();
-            try {
-              int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
-              int count = 0;
-              for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
-                InputHost inputHost = inputHostIter.next();
-                inputHostIter.remove();
-                if (inputHost.getNumPendingInputs() > 0) {
-                  Fetcher fetcher = constructFetcherForHost(inputHost);
-                  LOG.info("Scheduling fetch for inputHost: " + inputHost);
-                  numRunningFetchers.incrementAndGet();
-                  ListenableFuture<FetchResult> future = fetcherExecutor
-                      .submit(fetcher);
-                  Futures.addCallback(future, fetchFutureCallback);
-                  if (++count >= numFetchersToRun) {
-                    break;
-                  }
+        }
+        if (shuffleError != null) {
+          // InputContext has already been informed of a fatal error.
+          // Initiate shutdown.
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+        }
+        if (numCompletedInputs.get() < numInputs) {
+          lock.lock();
+          try {
+            int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers
+                - numRunningFetchers.get());
+            int count = 0;
+            while (pendingHosts.peek() != null) {
+              InputHost inputHost = pendingHosts.take();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing pending host: " + inputHost);
+              }
+              if (inputHost.getNumPendingInputs() > 0) {
+                LOG.info("Scheduling fetch for inputHost: " + inputHost);
+                Fetcher fetcher = constructFetcherForHost(inputHost);
+                numRunningFetchers.incrementAndGet();
+                ListenableFuture<FetchResult> future = fetcherExecutor
+                    .submit(fetcher);
+                Futures.addCallback(future, fetchFutureCallback);
+                if (++count >= numFetchersToRun) {
+                  break;
+                }
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Skipping host: " + inputHost.getHost()
+                      + " since it has know inputs to process");
                 }
               }
-            } finally {
-              lock.unlock();
             }
+          } finally {
+            lock.unlock();
           }
         }
       }
+      LOG.info("Shutting down FetchScheduler");
       // TODO NEWTEZ Maybe clean up inputs.
       if (!fetcherExecutor.isShutdown()) {
         fetcherExecutor.shutdownNow();
@@ -282,10 +291,18 @@ public class BroadcastShuffleManager implements FetcherCallback {
         host = old;
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+    }
     host.addKnownInput(srcAttemptIdentifier);
     lock.lock();
     try {
-      pendingHosts.add(host);
+      boolean added = pendingHosts.offer(host);
+      if (!added) {
+        String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
+        LOG.error(errorMessage);
+        throw new TezUncheckedException(errorMessage);
+      }
       wakeLoop.signal();
     } finally {
       lock.unlock();
@@ -335,7 +352,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
       long copyDuration) throws IOException {
     InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
 
-    LOG.info("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+    LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
 
     // Count irrespective of whether this is a copy of an already fetched input
     lock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 9941de0..e6dc581 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -104,6 +104,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
 
     Path indexFile = ouputFileManager
         .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+    LOG.info("Writing index file: " + indexFile);
     sr.writeToFile(indexFile, conf);
     return numRecords > 0;
   }
@@ -116,6 +117,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
 
   public void initWriter() throws IOException {
     Path outputFile = ouputFileManager.getOutputFileForWrite();
+    LOG.info("Writing data file: " + outputFile);
 
     // TODO NEWTEZ Consider making the buffer size configurable. Also consider
     // setting up an in-memory buffer which is occasionally flushed to disk so