You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:29:15 UTC

[1/2] TEZ-911. Re-factor BroadcastShuffle related code to be independent of Braodcast. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 351a61058 -> 0df108154


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
new file mode 100644
index 0000000..6b91558
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -0,0 +1,706 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.shuffle.common.FetchResult;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
+import org.apache.tez.runtime.library.shuffle.common.InputHost;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class ShuffleManager implements FetcherCallback {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
+  
+  private final TezInputContext inputContext;
+  private final Configuration conf;
+  private final int numInputs;
+
+  private ShuffleEventHandler inputEventHandler;
+  private FetchedInputAllocator inputManager;
+  
+  private ExecutorService fetcherRawExecutor;
+  private ListeningExecutorService fetcherExecutor;
+
+  private ExecutorService schedulerRawExecutor;
+  private ListeningExecutorService schedulerExecutor;
+  private RunShuffleCallable schedulerCallable = new RunShuffleCallable();
+  
+  private BlockingQueue<FetchedInput> completedInputs;
+  private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
+  private Set<InputIdentifier> completedInputSet;
+  private ConcurrentMap<String, InputHost> knownSrcHosts;
+  private BlockingQueue<InputHost> pendingHosts;
+  private Set<InputAttemptIdentifier> obsoletedInputs;
+  
+  private AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  
+  private long startTime;
+  private long lastProgressTime;
+
+  // Required to be held when manipulating pendingHosts
+  private ReentrantLock lock = new ReentrantLock();
+  private Condition wakeLoop = lock.newCondition();
+  
+  private int numFetchers;
+  private AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  
+  // Parameters required by Fetchers
+  private SecretKey shuffleSecret;
+  private int connectionTimeout;
+  private int readTimeout;
+  private CompressionCodec codec;
+  
+  private int ifileBufferSize;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  
+  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+  
+  private volatile Throwable shuffleError;
+  
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter failedShufflesCounter;
+  private final TezCounter bytesShuffledCounter;
+  private final TezCounter decompressedDataSizeCounter;
+  private final TezCounter bytesShuffledToDiskCounter;
+  private final TezCounter bytesShuffledToMemCounter;
+  
+  // TODO More counters - FetchErrors, speed?
+  
+  public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.numInputs = numInputs;
+    
+    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+  }
+  
+  public void setIfileParameters(int bufferSize, boolean readAhead, int readAheadLength) {
+    this.ifileBufferSize = bufferSize;
+    this.ifileReadAhead = readAhead;
+    this.ifileReadAheadLength = readAheadLength;
+  }
+  
+  public void setCompressionCodec(CompressionCodec codec) {
+    this.codec = codec;
+  }
+  
+  public void setInputEventHandler(ShuffleEventHandler eventHandler) {
+    this.inputEventHandler = eventHandler;
+  }
+  
+  public void setFetchedInputAllocator(FetchedInputAllocator allocator) {
+    this.inputManager = allocator;
+  }
+
+
+  private void configureAndStart() throws IOException {
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    pendingHosts = new LinkedBlockingQueue<InputHost>();
+    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+    
+    int maxConfiguredFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    
+    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+    
+    this.fetcherRawExecutor = Executors.newFixedThreadPool(
+        numFetchers,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
+            .build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+    
+    this.schedulerRawExecutor = Executors.newFixedThreadPool(
+        1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+            .build());
+    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+    
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+    
+    this.shuffleSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    
+    this.connectionTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
+    this.readTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    
+    LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+        + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+        + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength);
+  }
+
+  public void run() throws IOException {
+    Preconditions.checkState(inputManager != null, "InputManager must be configured");
+    Preconditions.checkState(inputEventHandler != null, "InputEventHandler must be configured");
+    
+    configureAndStart();
+    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
+    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+    // Shutdown this executor once this task, and the callback complete.
+    schedulerExecutor.shutdown();
+  }
+  
+  private class RunShuffleCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
+        lock.lock();
+        try {
+          if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+            if (numCompletedInputs.get() < numInputs) {
+              wakeLoop.await();
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+
+        if (shuffleError != null) {
+          // InputContext has already been informed of a fatal error. Relying on
+          // tez to kill the task.
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+        }
+        if (numCompletedInputs.get() < numInputs) {
+          lock.lock();
+          try {
+            int maxFetchersToRun = numFetchers - numRunningFetchers.get();
+            int count = 0;
+            while (pendingHosts.peek() != null) {
+              InputHost inputHost = null;
+              try {
+                inputHost = pendingHosts.take();
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
+              }
+              if (inputHost.getNumPendingInputs() > 0) {
+                LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
+                Fetcher fetcher = constructFetcherForHost(inputHost);
+                numRunningFetchers.incrementAndGet();
+                if (isShutdown.get()) {
+                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                }
+                ListenableFuture<FetchResult> future = fetcherExecutor
+                    .submit(fetcher);
+                Futures.addCallback(future, fetchFutureCallback);
+                if (++count >= maxFetchersToRun) {
+                  break;
+                }
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Skipping host: " + inputHost.getHost()
+                      + " since it has no inputs to process");
+                }
+              }
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
+      // TODO NEWTEZ Maybe clean up inputs.
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+  
+  private Fetcher constructFetcherForHost(InputHost inputHost) {
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(
+        ShuffleManager.this, inputManager,
+        inputContext.getApplicationId(), shuffleSecret, conf);
+    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+    if (codec != null) {
+      fetcherBuilder.setCompressionParameters(codec);
+    }
+    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
+
+    // Remove obsolete inputs from the list being given to the fetcher. Also
+    // remove from the obsolete list.
+    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+        .clearAndGetPendingInputs();
+    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+        .iterator(); inputIter.hasNext();) {
+      InputAttemptIdentifier input = inputIter.next();
+      // Avoid adding attempts which have already completed.
+      if (completedInputSet.contains(input.getInputIdentifier())) {
+        inputIter.remove();
+      }
+      // Avoid adding attempts which have been marked as OBSOLETE 
+      if (obsoletedInputs.contains(input)) {
+        inputIter.remove();
+        obsoletedInputs.remove(input);
+      }
+    }
+    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+    // fetcher, especially in the case where #hosts < #fetchers
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+        pendingInputsForHost);
+    LOG.info("Created Fetcher for host: " + inputHost.getHost()
+        + ", with inputs: " + pendingInputsForHost);
+    return fetcherBuilder.build();
+  }
+  
+  /////////////////// Methods for InputEventHandler
+  
+  public void addKnownInput(String hostName, int port,
+      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+    InputHost host = knownSrcHosts.get(hostName);
+    if (host == null) {
+      host = new InputHost(hostName, port, inputContext.getApplicationId());
+      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+      if (old != null) {
+        host = old;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+    }
+    host.addKnownInput(srcAttemptIdentifier);
+    lock.lock();
+    try {
+      boolean added = pendingHosts.offer(host);
+      if (!added) {
+        String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
+        LOG.error(errorMessage);
+        throw new TezUncheckedException(errorMessage);
+      }
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithNoData(
+      InputAttemptIdentifier srcAttemptIdentifier) {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
+        }
+      }
+    }
+
+    // Awake the loop to check for termination.
+    lock.lock();
+    try {
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithData(
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+
+    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
+        + fetchedInput.getType());
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          registerCompletedInput(fetchedInput);
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another
+                            // abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+    obsoletedInputs.add(srcAttemptIdentifier);
+    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+  }
+  
+  
+  public void handleEvents(List<Event> events) throws IOException {
+    inputEventHandler.handleEvents(events);
+  }
+
+  /////////////////// End of Methods for InputEventHandler
+  /////////////////// Methods from FetcherCallbackHandler
+  
+  @Override
+  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
+      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
+
+    LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+    
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          
+          // Processing counters for completed and commit fetches only. Need
+          // additional counters for excessive fetches - which primarily comes
+          // in after speculation or retries.
+          shuffledInputsCounter.increment(1);
+          bytesShuffledCounter.increment(fetchedBytes);
+          if (fetchedInput.getType() == Type.MEMORY) {
+            bytesShuffledToMemCounter.increment(fetchedBytes);
+          } else {
+            bytesShuffledToDiskCounter.increment(fetchedBytes);
+          }
+          decompressedDataSizeCounter.increment(decompressedLength);
+
+          registerCompletedInput(fetchedInput);
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+  }
+
+  @Override
+  public void fetchFailed(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+    // For now, reporting immediately.
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+        + connectFailed);
+    failedShufflesCounter.increment(1);
+    if (srcAttemptIdentifier == null) {
+      String message = "Received fetchFailure for an unknown src (null)";
+      LOG.fatal(message);
+      inputContext.fatalError(null, message);
+    } else {
+    InputReadErrorEvent readError = new InputReadErrorEvent(
+        "Fetch failure while fetching from "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(),
+                srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+                srcAttemptIdentifier.getAttemptNumber()),
+        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+        srcAttemptIdentifier.getAttemptNumber());
+    
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(readError);
+    inputContext.sendEvents(failedEvents);
+    }
+  }
+  /////////////////// End of Methods from FetcherCallbackHandler
+
+  public void shutdown() throws InterruptedException {
+    isShutdown.set(true);
+    if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+      this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
+    }
+    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+      this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
+    }
+  }
+  
+  private void registerCompletedInput(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+      completedInputs.add(fetchedInput);
+      if (!inputReadyNotificationSent.getAndSet(true)) {
+        // TODO Should eventually be controlled by Inputs which are processing the data.
+        inputContext.inputIsReady();
+      }
+      numCompletedInputs.incrementAndGet();
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  /////////////////// Methods for walking the available inputs
+  
+  /**
+   * @return true if there is another input ready for consumption.
+   */
+  public boolean newInputAvailable() {
+    FetchedInput head = completedInputs.peek();
+    if (head == null || head instanceof NullFetchedInput) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * @return true if all of the required inputs have been fetched.
+   */
+  public boolean allInputsFetched() {
+    lock.lock();
+    try {
+      return numCompletedInputs.get() == numInputs;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @return the next available input, or null if there are no available inputs.
+   *         This method will block if there are currently no available inputs,
+   *         but more may become available.
+   */
+  public FetchedInput getNextInput() throws InterruptedException {
+    FetchedInput input = null;
+    do {
+      // Check for no additional inputs
+      lock.lock();
+      try {
+        input = completedInputs.peek();
+        if (input == null && allInputsFetched()) {
+          break;
+        }
+      } finally {
+        lock.unlock();
+      }
+      input = completedInputs.take(); // block
+    } while (input instanceof NullFetchedInput);
+    return input;
+  }
+  /////////////////// End of methods for walking the available inputs
+
+
+  /**
+   * Fake input that is added to the completed input list in case an input does not have any data.
+   *
+   */
+  private class NullFetchedInput extends FetchedInput {
+
+    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void free() {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+  }
+  
+  
+  private class SchedulerFutureCallback implements FutureCallback<Void> {
+
+    @Override
+    public void onSuccess(Void result) {
+      LOG.info("Scheduler thread completed");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error: " + t);
+      } else {
+        LOG.error("Scheduler failed with error: ", t);
+        inputContext.fatalError(t, "Shuffle Scheduler Failed");
+      }
+    }
+    
+  }
+  
+  private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+    private void doBookKeepingForFetcherComplete() {
+      numRunningFetchers.decrementAndGet();
+      lock.lock();
+      try {
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    
+    @Override
+    public void onSuccess(FetchResult result) {
+      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+        InputHost inputHost = knownSrcHosts.get(result.getHost());
+        assert inputHost != null;
+        for (InputAttemptIdentifier input : pendingInputs) {
+          inputHost.addKnownInput(input);
+        }
+        pendingHosts.add(inputHost);
+      }
+      doBookKeepingForFetcherComplete();
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+      } else {
+        LOG.error("Fetcher failed with error: ", t);
+        shuffleError = t;
+        inputContext.fatalError(t, "Fetch failed");
+        doBookKeepingForFetcherComplete();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
new file mode 100644
index 0000000..d8682f3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
+public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
+    FetchedInputCallback {
+
+  private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
+  
+  private final Configuration conf;
+  private final String uniqueIdentifier;
+
+  private TezTaskOutputFiles fileNameAllocator;
+  private LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private long memoryLimit;
+  private long maxSingleShuffleLimit;
+
+  private volatile long usedMemory = 0;
+  
+  private long maxAvailableTaskMemory;
+  private long initialMemoryAvailable =-1l;
+
+  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
+    this.conf = conf;    
+    this.uniqueIdentifier = uniqueIdentifier;
+    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+  }
+
+  @Private
+  public void configureAndStart() {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial memory must be configured before starting");
+    this.fileNameAllocator = new TezTaskOutputFiles(conf,
+        uniqueIdentifier);
+    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    
+    if (memReq <= this.initialMemoryAvailable) {
+      this.memoryLimit = memReq;
+    } else {
+      this.memoryLimit = initialMemoryAvailable;
+    }
+
+    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
+
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+    
+    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
+    this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+  }
+  
+  @Private
+  public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    return memReq;
+  }
+
+  @Private
+  public void setInitialMemoryAvailable(long available) {
+    this.initialMemoryAvailable = available;
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (actualSize > maxSingleShuffleLimit
+        || this.usedMemory + actualSize > this.memoryLimit) {
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
+    } else {
+      this.usedMemory += actualSize;
+      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
+      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public synchronized void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    // Not tracking anything here.
+    case DISK:
+    case MEMORY:
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public synchronized void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public synchronized void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      break;
+    case MEMORY:
+      unreserve(fetchedInput.getActualSize());
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
deleted file mode 100644
index f07c5ac..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.junit.Test;
-
-public class TestBroadcastInputManager {
-
-  private static final Log LOG = LogFactory.getLog(TestBroadcastInputManager.class);
-  
-  @Test
-  public void testInMemAllocation() throws IOException {
-    String localDirs = "/tmp/" + this.getClass().getName();
-    Configuration conf = new Configuration();
-    
-    long jvmMax = Runtime.getRuntime().maxMemory();
-    LOG.info("jvmMax: " + jvmMax);
-    
-    float bufferPercent = 0.1f;
-    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
-    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
-    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
-    
-    long inMemThreshold = (long) (bufferPercent * jvmMax);
-    LOG.info("InMemThreshold: " + inMemThreshold);
-
-    BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(),
-        conf, Runtime.getRuntime().maxMemory());
-    inputManager.setInitialMemoryAvailable(inMemThreshold);
-    inputManager.configureAndStart();
-
-    long requestSize = (long) (0.4f * inMemThreshold);
-    long compressedSize = 1l;
-    LOG.info("RequestSize: " + requestSize);
-    
-    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
-    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
-    
-    
-    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
-    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
-    
-    
-    // Over limit by this point. Next reserve should give back a DISK allocation
-    FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
-    assertEquals(FetchedInput.Type.DISK, fi3.getType());
-    
-    
-    // Freed one memory allocation. Next should be mem again.
-    fi1.abort();
-    fi1.free();
-    FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
-    assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
-    
-    // Freed one disk allocation. Next sould be disk again (no mem freed)
-    fi3.abort();
-    fi3.free();
-    FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
-    assertEquals(FetchedInput.Type.DISK, fi5.getType());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
new file mode 100644
index 0000000..3ec839d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
+import org.junit.Test;
+
+public class TestSimpleFetchedInputAllocator {
+
+  private static final Log LOG = LogFactory.getLog(TestSimpleFetchedInputAllocator.class);
+  
+  @Test
+  public void testInMemAllocation() throws IOException {
+    String localDirs = "/tmp/" + this.getClass().getName();
+    Configuration conf = new Configuration();
+    
+    long jvmMax = Runtime.getRuntime().maxMemory();
+    LOG.info("jvmMax: " + jvmMax);
+    
+    float bufferPercent = 0.1f;
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+    
+    long inMemThreshold = (long) (bufferPercent * jvmMax);
+    LOG.info("InMemThreshold: " + inMemThreshold);
+
+    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
+        conf, Runtime.getRuntime().maxMemory());
+    inputManager.setInitialMemoryAvailable(inMemThreshold);
+    inputManager.configureAndStart();
+
+    long requestSize = (long) (0.4f * inMemThreshold);
+    long compressedSize = 1l;
+    LOG.info("RequestSize: " + requestSize);
+    
+    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+    
+    
+    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+    
+    
+    // Over limit by this point. Next reserve should give back a DISK allocation
+    FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
+    assertEquals(FetchedInput.Type.DISK, fi3.getType());
+    
+    
+    // Freed one memory allocation. Next should be mem again.
+    fi1.abort();
+    fi1.free();
+    FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+    
+    // Freed one disk allocation. Next sould be disk again (no mem freed)
+    fi3.abort();
+    fi3.free();
+    FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+    assertEquals(FetchedInput.Type.DISK, fi5.getType());
+  }
+
+}


[2/2] git commit: TEZ-911. Re-factor BroadcastShuffle related code to be independent of Braodcast. (sseth)

Posted by ss...@apache.org.
TEZ-911. Re-factor BroadcastShuffle related code to be independent of
Braodcast. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0df10815
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0df10815
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0df10815

Branch: refs/heads/master
Commit: 0df108154743ff689e3ba4ff2671e6c809eeb660
Parents: 351a610
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:28:22 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:28:22 2014 -0800

----------------------------------------------------------------------
 .../broadcast/input/BroadcastInputManager.java  | 194 -----
 .../broadcast/input/BroadcastKVReader.java      | 197 -----
 .../BroadcastShuffleInputEventHandler.java      |   9 +-
 .../input/BroadcastShuffleManager.java          | 740 -------------------
 .../readers/ShuffledUnorderedKVReader.java      | 198 +++++
 .../library/input/ShuffledUnorderedKVInput.java | 103 ++-
 .../shuffle/common/ShuffleEventHandler.java     |  28 +
 .../shuffle/common/impl/ShuffleManager.java     | 706 ++++++++++++++++++
 .../impl/SimpleFetchedInputAllocator.java       | 194 +++++
 .../input/TestBroadcastInputManager.java        |  89 ---
 .../impl/TestSimpleFetchedInputAllocator.java   |  90 +++
 11 files changed, 1314 insertions(+), 1234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
deleted file mode 100644
index 4af4404..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
- *
- */
-@Private
-public class BroadcastInputManager implements FetchedInputAllocator,
-    FetchedInputCallback {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
-  
-  private final Configuration conf;
-  private final String uniqueIdentifier;
-
-  private TezTaskOutputFiles fileNameAllocator;
-  private LocalDirAllocator localDirAllocator;
-
-  // Configuration parameters
-  private long memoryLimit;
-  private long maxSingleShuffleLimit;
-
-  private volatile long usedMemory = 0;
-  
-  private long maxAvailableTaskMemory;
-  private long initialMemoryAvailable =-1l;
-
-  public BroadcastInputManager(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
-    this.conf = conf;    
-    this.uniqueIdentifier = uniqueIdentifier;
-    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
-  }
-
-  @Private
-  void configureAndStart() {
-    Preconditions.checkState(initialMemoryAvailable != -1,
-        "Initial memory must be configured before starting");
-    this.fileNameAllocator = new TezTaskOutputFiles(conf,
-        uniqueIdentifier);
-    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-    // Setup configuration
-    final float maxInMemCopyUse = conf.getFloat(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
-          + maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-    
-    if (memReq <= this.initialMemoryAvailable) {
-      this.memoryLimit = memReq;
-    } else {
-      this.memoryLimit = initialMemoryAvailable;
-    }
-
-    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-
-    final float singleShuffleMemoryLimitPercent = conf.getFloat(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
-    
-    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
-    this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
-  }
-  
-  @Private
-  static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
-    final float maxInMemCopyUse = conf.getFloat(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
-          + maxInMemCopyUse);
-    }
-    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-    return memReq;
-  }
-
-  @Private
-  void setInitialMemoryAvailable(long available) {
-    this.initialMemoryAvailable = available;
-  }
-
-  @Override
-  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
-      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
-    if (actualSize > maxSingleShuffleLimit
-        || this.usedMemory + actualSize > this.memoryLimit) {
-      return new DiskFetchedInput(actualSize, compressedSize,
-          inputAttemptIdentifier, this, conf, localDirAllocator,
-          fileNameAllocator);
-    } else {
-      this.usedMemory += actualSize;
-      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
-      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
-    }
-  }
-
-  @Override
-  public synchronized void fetchComplete(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    // Not tracking anything here.
-    case DISK:
-    case MEMORY:
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  @Override
-  public synchronized void fetchFailed(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  @Override
-  public synchronized void freeResources(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  private void cleanup(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    case DISK:
-      break;
-    case MEMORY:
-      unreserve(fetchedInput.getActualSize());
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  private synchronized void unreserve(long size) {
-    this.usedMemory -= size;
-    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
deleted file mode 100644
index 2354257..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastKVReader<K, V> implements KeyValueReader {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
-  
-  private final BroadcastShuffleManager shuffleManager;
-  private final CompressionCodec codec;
-  
-  private final Class<K> keyClass;
-  private final Class<V> valClass;
-  private final Deserializer<K> keyDeserializer;
-  private final Deserializer<V> valDeserializer;
-  private final DataInputBuffer keyIn;
-  private final DataInputBuffer valIn;
-
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int ifileBufferSize;
-  
-  private final TezCounter inputRecordCounter;
-  
-  private K key;
-  private V value;
-  
-  private FetchedInput currentFetchedInput;
-  private IFile.Reader currentReader;
-  
-  // TODO Remove this once per I/O counters are separated properly. Relying on
-  // the counter at the moment will generate aggregate numbers. 
-  private int numRecordsRead = 0;
-  
-  public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
-      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
-      TezCounter inputRecordCounter)
-      throws IOException {
-    this.shuffleManager = shuffleManager;
-
-    this.codec = codec;
-    this.ifileReadAhead = ifileReadAhead;
-    this.ifileReadAheadLength = ifileReadAheadLength;
-    this.ifileBufferSize = ifileBufferSize;
-    this.inputRecordCounter = inputRecordCounter;
-
-    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
-
-    this.keyIn = new DataInputBuffer();
-    this.valIn = new DataInputBuffer();
-
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(keyIn);
-    this.valDeserializer = serializationFactory.getDeserializer(valClass);
-    this.valDeserializer.open(valIn);
-  }
-
-  // TODO NEWTEZ Maybe add an interface to check whether next will block.
-  
-  /**
-   * Moves to the next key/values(s) pair
-   * 
-   * @return true if another key/value(s) pair exists, false if there are no
-   *         more.
-   * @throws IOException
-   *           if an error occurs
-   */
-  @Override  
-  public boolean next() throws IOException {
-    if (readNextFromCurrentReader()) {
-      inputRecordCounter.increment(1);
-      numRecordsRead++;
-      return true;
-    } else {
-      boolean nextInputExists = moveToNextInput();
-      while (nextInputExists) {
-        if(readNextFromCurrentReader()) {
-          inputRecordCounter.increment(1);
-          numRecordsRead++;
-          return true;
-        }
-        nextInputExists = moveToNextInput();
-      }
-      LOG.info("Num Records read: " + numRecordsRead);
-      return false;
-    }
-  }
-
-  @Override
-  public Object getCurrentKey() throws IOException {
-    return (Object) key;
-  }
-
-  @Override
-  public Object getCurrentValue() throws IOException {
-    return value;
-  }
-
-  /**
-   * Tries reading the next key and value from the current reader.
-   * @return true if the current reader has more records
-   * @throws IOException
-   */
-  private boolean readNextFromCurrentReader() throws IOException {
-    // Initial reader.
-    if (this.currentReader == null) {
-      return false;
-    } else {
-      boolean hasMore = this.currentReader.nextRawKey(keyIn);
-      if (hasMore) {
-        this.currentReader.nextRawValue(valIn);
-        this.key = keyDeserializer.deserialize(this.key);
-        this.value = valDeserializer.deserialize(this.value);
-        return true;
-      }
-      return false;
-    }
-  }
-  
-  /**
-   * Moves to the next available input. This method may block if the input is not ready yet.
-   * Also takes care of closing the previous input.
-   * 
-   * @return true if the next input exists, false otherwise
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private boolean moveToNextInput() throws IOException {
-    if (currentReader != null) { // Close the current reader.
-      currentReader.close();
-      currentFetchedInput.free();
-    }
-    try {
-      currentFetchedInput = shuffleManager.getNextInput();
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting for next available input", e);
-      throw new IOException(e);
-    }
-    if (currentFetchedInput == null) {
-      return false; // No more inputs
-    } else {
-      currentReader = openIFileReader(currentFetchedInput);
-      return true;
-    }
-  }
-
-  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
-      throws IOException {
-    if (fetchedInput.getType() == Type.MEMORY) {
-      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
-
-      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
-          mfi.getBytes(), 0, (int) mfi.getActualSize());
-    } else {
-      return new IFile.Reader(fetchedInput.getInputStream(),
-          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
-          ifileReadAheadLength, ifileBufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index a7a12ef..68aa49f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -35,18 +35,20 @@ import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 
-public class BroadcastShuffleInputEventHandler {
+public class BroadcastShuffleInputEventHandler implements ShuffleEventHandler {
 
   private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
   
-  private final BroadcastShuffleManager shuffleManager;
+  private final ShuffleManager shuffleManager;
   private final FetchedInputAllocator inputAllocator;
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
@@ -54,7 +56,7 @@ public class BroadcastShuffleInputEventHandler {
   
   
   public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
-      BroadcastShuffleManager shuffleManager,
+      ShuffleManager shuffleManager,
       FetchedInputAllocator inputAllocator, CompressionCodec codec,
       boolean ifileReadAhead, int ifileReadAheadLength) {
     this.shuffleManager = shuffleManager;
@@ -64,6 +66,7 @@ public class BroadcastShuffleInputEventHandler {
     this.ifileReadAheadLength = ifileReadAheadLength;
   }
 
+  @Override
   public void handleEvents(List<Event> events) throws IOException {
     for (Event event : events) {
       handleEvent(event);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
deleted file mode 100644
index ca58396..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ /dev/null
@@ -1,740 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.shuffle.common.FetchResult;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
-import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
-import org.apache.tez.runtime.library.shuffle.common.InputHost;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCallback {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
-  
-  private final TezInputContext inputContext;
-  private final Configuration conf;
-  private final int numInputs;
-  
-  private BroadcastShuffleInputEventHandler inputEventHandler;
-  private FetchedInputAllocator inputManager;
-  
-  private ExecutorService fetcherRawExecutor;
-  private ListeningExecutorService fetcherExecutor;
-
-  private ExecutorService schedulerRawExecutor;
-  private ListeningExecutorService schedulerExecutor;
-  private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
-  
-  private BlockingQueue<FetchedInput> completedInputs;
-  private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
-  private Set<InputIdentifier> completedInputSet;
-  private ConcurrentMap<String, InputHost> knownSrcHosts;
-  private BlockingQueue<InputHost> pendingHosts;
-  private Set<InputAttemptIdentifier> obsoletedInputs;
-  
-  private AtomicInteger numCompletedInputs = new AtomicInteger(0);
-  
-  private long startTime;
-  private long lastProgressTime;
-
-  // Required to be held when manipulating pendingHosts
-  private ReentrantLock lock = new ReentrantLock();
-  private Condition wakeLoop = lock.newCondition();
-  
-  private int numFetchers;
-  private AtomicInteger numRunningFetchers = new AtomicInteger(0);
-  
-  // Parameters required by Fetchers
-  private SecretKey shuffleSecret;
-  private int connectionTimeout;
-  private int readTimeout;
-  private CompressionCodec codec;
-  
-  private boolean ifileReadAhead;
-  private int ifileReadAheadLength;
-  private int ifileBufferSize;
-  
-  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-  
-  private volatile Throwable shuffleError;
-  
-  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-  
-  private volatile long initialMemoryAvailable = -1l;
-
-  private final TezCounter shuffledInputsCounter;
-  private final TezCounter failedShufflesCounter;
-  private final TezCounter bytesShuffledCounter;
-  private final TezCounter decompressedDataSizeCounter;
-  private final TezCounter bytesShuffledToDiskCounter;
-  private final TezCounter bytesShuffledToMemCounter;
-  
-  // TODO More counters - FetchErrors, speed?
-  
-  public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.numInputs = numInputs;
-    long initalMemReq = getInitialMemoryReq();
-    this.inputContext.requestInitialMemory(initalMemReq, this);
-    
-    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
-    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
-    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
-    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
-    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
-    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
-  }
-
-  private void configureAndStart() throws IOException {
-    Preconditions.checkState(initialMemoryAvailable != -1,
-        "Initial memory available must be configured before starting");
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
-
-    this.ifileReadAhead = conf.getBoolean(
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
-    if (this.ifileReadAhead) {
-      this.ifileReadAheadLength = conf.getInt(
-          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
-          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
-    } else {
-      this.ifileReadAheadLength = 0;
-    }
-    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
-        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-    
-    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf,
-        inputContext.getTotalMemoryAvailableToTask());
-    ((BroadcastInputManager)this.inputManager).setInitialMemoryAvailable(initialMemoryAvailable);
-    ((BroadcastInputManager)this.inputManager).configureAndStart();
-    this.inputEventHandler = new BroadcastShuffleInputEventHandler(
-        inputContext, this, this.inputManager, codec, ifileReadAhead,
-        ifileReadAheadLength);
-
-    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
-    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
-    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
-    pendingHosts = new LinkedBlockingQueue<InputHost>();
-    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-    
-    int maxConfiguredFetchers = 
-        conf.getInt(
-            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
-    
-    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-    
-    this.fetcherRawExecutor = Executors.newFixedThreadPool(
-        numFetchers,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(
-                "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
-            .build());
-    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    
-    this.schedulerRawExecutor = Executors.newFixedThreadPool(
-        1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(
-                "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
-            .build());
-    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
-    
-    this.startTime = System.currentTimeMillis();
-    this.lastProgressTime = startTime;
-    
-    this.shuffleSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-    
-    this.connectionTimeout = conf.getInt(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
-    this.readTimeout = conf.getInt(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
-    
-    
-    LOG.info("BroadcastShuffleManager -> numInputs: " + numInputs
-        + " compressionCodec: " + (codec == null ? "NoCompressionCodec" : codec.getClass()
-        .getName()) + ", numFetchers: " + numFetchers);
-  }
-  
-  private long getInitialMemoryReq() {
-    return BroadcastInputManager.getInitialMemoryReq(conf,
-        inputContext.getTotalMemoryAvailableToTask());
-  }
-  
-  public void setInitialMemoryAvailable(long available) {
-    this.initialMemoryAvailable = available;
-  }
-
-  public void run() throws IOException {
-    configureAndStart();
-    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
-    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
-    // Shutdown this executor once this task, and the callback complete.
-    schedulerExecutor.shutdown();
-  }
-  
-  private class RunBroadcastShuffleCallable implements Callable<Void> {
-
-    @Override
-    public Void call() throws Exception {
-      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
-        lock.lock();
-        try {
-          if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
-            if (numCompletedInputs.get() < numInputs) {
-              wakeLoop.await();
-            }
-          }
-        } finally {
-          lock.unlock();
-        }
-
-        if (shuffleError != null) {
-          // InputContext has already been informed of a fatal error. Relying on
-          // tez to kill the task.
-          break;
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
-        }
-        if (numCompletedInputs.get() < numInputs) {
-          lock.lock();
-          try {
-            int maxFetchersToRun = numFetchers - numRunningFetchers.get();
-            int count = 0;
-            while (pendingHosts.peek() != null) {
-              InputHost inputHost = null;
-              try {
-                inputHost = pendingHosts.take();
-              } catch (InterruptedException e) {
-                if (isShutdown.get()) {
-                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
-                  break;
-                } else {
-                  throw e;
-                }
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
-              }
-              if (inputHost.getNumPendingInputs() > 0) {
-                LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
-                Fetcher fetcher = constructFetcherForHost(inputHost);
-                numRunningFetchers.incrementAndGet();
-                if (isShutdown.get()) {
-                  LOG.info("hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
-                }
-                ListenableFuture<FetchResult> future = fetcherExecutor
-                    .submit(fetcher);
-                Futures.addCallback(future, fetchFutureCallback);
-                if (++count >= maxFetchersToRun) {
-                  break;
-                }
-              } else {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Skipping host: " + inputHost.getHost()
-                      + " since it has no inputs to process");
-                }
-              }
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-      }
-      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
-      // TODO NEWTEZ Maybe clean up inputs.
-      if (!fetcherExecutor.isShutdown()) {
-        fetcherExecutor.shutdownNow();
-      }
-      return null;
-    }
-  }
-  
-  private Fetcher constructFetcherForHost(InputHost inputHost) {
-    FetcherBuilder fetcherBuilder = new FetcherBuilder(
-        BroadcastShuffleManager.this, inputManager,
-        inputContext.getApplicationId(), shuffleSecret, conf);
-    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
-    if (codec != null) {
-      fetcherBuilder.setCompressionParameters(codec);
-    }
-    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
-
-    // Remove obsolete inputs from the list being given to the fetcher. Also
-    // remove from the obsolete list.
-    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
-        .clearAndGetPendingInputs();
-    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
-        .iterator(); inputIter.hasNext();) {
-      InputAttemptIdentifier input = inputIter.next();
-      // Avoid adding attempts which have already completed.
-      if (completedInputSet.contains(input.getInputIdentifier())) {
-        inputIter.remove();
-      }
-      // Avoid adding attempts which have been marked as OBSOLETE 
-      if (obsoletedInputs.contains(input)) {
-        inputIter.remove();
-        obsoletedInputs.remove(input);
-      }
-    }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
-    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
-        pendingInputsForHost);
-    LOG.info("Created Fetcher for host: " + inputHost.getHost()
-        + ", with inputs: " + pendingInputsForHost);
-    return fetcherBuilder.build();
-  }
-  
-  /////////////////// Methods for InputEventHandler
-  
-  public void addKnownInput(String hostName, int port,
-      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
-    InputHost host = knownSrcHosts.get(hostName);
-    if (host == null) {
-      host = new InputHost(hostName, port, inputContext.getApplicationId());
-      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
-      if (old != null) {
-        host = old;
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
-    }
-    host.addKnownInput(srcAttemptIdentifier);
-    lock.lock();
-    try {
-      boolean added = pendingHosts.offer(host);
-      if (!added) {
-        String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
-        LOG.error(errorMessage);
-        throw new TezUncheckedException(errorMessage);
-      }
-      wakeLoop.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public void addCompletedInputWithNoData(
-      InputAttemptIdentifier srcAttemptIdentifier) {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
-    
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
-        }
-      }
-    }
-
-    // Awake the loop to check for termination.
-    lock.lock();
-    try {
-      wakeLoop.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public void addCompletedInputWithData(
-      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
-      throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
-    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
-        + fetchedInput.getType());
-    // Count irrespective of whether this is a copy of an already fetched input
-    lock.lock();
-    try {
-      lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          registerCompletedInput(fetchedInput);
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another
-                            // abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
-  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
-    obsoletedInputs.add(srcAttemptIdentifier);
-    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
-  }
-  
-  
-  public void handleEvents(List<Event> events) throws IOException {
-    inputEventHandler.handleEvents(events);
-  }
-
-  /////////////////// End of Methods for InputEventHandler
-  /////////////////// Methods from FetcherCallbackHandler
-  
-  @Override
-  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
-      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
-      throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
-
-    LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
-
-    // Count irrespective of whether this is a copy of an already fetched input
-    lock.lock();
-    try {
-      lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-    
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          
-          // Processing counters for completed and commit fetches only. Need
-          // additional counters for excessive fetches - which primarily comes
-          // in after speculation or retries.
-          shuffledInputsCounter.increment(1);
-          bytesShuffledCounter.increment(fetchedBytes);
-          if (fetchedInput.getType() == Type.MEMORY) {
-            bytesShuffledToMemCounter.increment(fetchedBytes);
-          } else {
-            bytesShuffledToDiskCounter.increment(fetchedBytes);
-          }
-          decompressedDataSizeCounter.increment(decompressedLength);
-
-          registerCompletedInput(fetchedInput);
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
-  }
-
-  @Override
-  public void fetchFailed(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
-    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
-    // For now, reporting immediately.
-    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
-        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
-        + connectFailed);
-    failedShufflesCounter.increment(1);
-    if (srcAttemptIdentifier == null) {
-      String message = "Received fetchFailure for an unknown src (null)";
-      LOG.fatal(message);
-      inputContext.fatalError(null, message);
-    } else {
-    InputReadErrorEvent readError = new InputReadErrorEvent(
-        "Fetch failure while fetching from "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(),
-                srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-                srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-        srcAttemptIdentifier.getAttemptNumber());
-    
-    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-    failedEvents.add(readError);
-    inputContext.sendEvents(failedEvents);
-    }
-  }
-  /////////////////// End of Methods from FetcherCallbackHandler
-
-  public void shutdown() throws InterruptedException {
-    isShutdown.set(true);
-    if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
-      this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
-    }
-    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
-      this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
-    }
-  }
-  
-  private void registerCompletedInput(FetchedInput fetchedInput) {
-    lock.lock();
-    try {
-      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
-      completedInputs.add(fetchedInput);
-      if (!inputReadyNotificationSent.getAndSet(true)) {
-        inputContext.inputIsReady();
-      }
-      numCompletedInputs.incrementAndGet();
-    } finally {
-      lock.unlock();
-    }
-  }
-  
-  /////////////////// Methods for walking the available inputs
-  
-  /**
-   * @return true if there is another input ready for consumption.
-   */
-  public boolean newInputAvailable() {
-    FetchedInput head = completedInputs.peek();
-    if (head == null || head instanceof NullFetchedInput) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * @return true if all of the required inputs have been fetched.
-   */
-  public boolean allInputsFetched() {
-    lock.lock();
-    try {
-      return numCompletedInputs.get() == numInputs;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * @return the next available input, or null if there are no available inputs.
-   *         This method will block if there are currently no available inputs,
-   *         but more may become available.
-   */
-  public FetchedInput getNextInput() throws InterruptedException {
-    FetchedInput input = null;
-    do {
-      // Check for no additional inputs
-      lock.lock();
-      try {
-        input = completedInputs.peek();
-        if (input == null && allInputsFetched()) {
-          break;
-        }
-      } finally {
-        lock.unlock();
-      }
-      input = completedInputs.take(); // block
-    } while (input instanceof NullFetchedInput);
-    return input;
-  }
-  /////////////////// End of methods for walking the available inputs
-
-  @SuppressWarnings("rawtypes")
-  public BroadcastKVReader createReader(TezCounter inputRecordCounter) throws IOException {
-    return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength,
-        ifileBufferSize, inputRecordCounter);
-  }
-  
-  /**
-   * Fake input that is added to the completed input list in case an input does not have any data.
-   *
-   */
-  private class NullFetchedInput extends FetchedInput {
-
-    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void commit() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void abort() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void free() {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-  }
-  
-  
-  private class SchedulerFutureCallback implements FutureCallback<Void> {
-
-    @Override
-    public void onSuccess(Void result) {
-      LOG.info("Scheduler thread completed");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error: " + t);
-      } else {
-        LOG.error("Scheduler failed with error: ", t);
-        inputContext.fatalError(t, "Broadcast Scheduler Failed");
-      }
-    }
-    
-  }
-  
-  private class FetchFutureCallback implements FutureCallback<FetchResult> {
-
-    private void doBookKeepingForFetcherComplete() {
-      numRunningFetchers.decrementAndGet();
-      lock.lock();
-      try {
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-    
-    @Override
-    public void onSuccess(FetchResult result) {
-      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
-      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-        InputHost inputHost = knownSrcHosts.get(result.getHost());
-        assert inputHost != null;
-        for (InputAttemptIdentifier input : pendingInputs) {
-          inputHost.addKnownInput(input);
-        }
-        pendingHosts.add(inputHost);
-      }
-      doBookKeepingForFetcherComplete();
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
-      } else {
-        LOG.error("Fetcher failed with error: ", t);
-        shuffleError = t;
-        inputContext.fatalError(t, "Fetch failed");
-        doBookKeepingForFetcherComplete();
-      }
-    }
-  }
-
-  @Override
-  public void memoryAssigned(long assignedSize) {
-    this.initialMemoryAvailable = assignedSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
new file mode 100644
index 0000000..796890c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.readers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+public class ShuffledUnorderedKVReader<K, V> implements KeyValueReader {
+
+  private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVReader.class);
+  
+  private final ShuffleManager shuffleManager;
+  private final CompressionCodec codec;
+  
+  private final Class<K> keyClass;
+  private final Class<V> valClass;
+  private final Deserializer<K> keyDeserializer;
+  private final Deserializer<V> valDeserializer;
+  private final DataInputBuffer keyIn;
+  private final DataInputBuffer valIn;
+
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
+  
+  private final TezCounter inputRecordCounter;
+  
+  private K key;
+  private V value;
+  
+  private FetchedInput currentFetchedInput;
+  private IFile.Reader currentReader;
+  
+  // TODO Remove this once per I/O counters are separated properly. Relying on
+  // the counter at the moment will generate aggregate numbers. 
+  private int numRecordsRead = 0;
+  
+  public ShuffledUnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
+      TezCounter inputRecordCounter)
+      throws IOException {
+    this.shuffleManager = shuffleManager;
+
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.ifileBufferSize = ifileBufferSize;
+    this.inputRecordCounter = inputRecordCounter;
+
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+
+    this.keyIn = new DataInputBuffer();
+    this.valIn = new DataInputBuffer();
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(valIn);
+  }
+
+  // TODO NEWTEZ Maybe add an interface to check whether next will block.
+  
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no
+   *         more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  @Override  
+  public boolean next() throws IOException {
+    if (readNextFromCurrentReader()) {
+      inputRecordCounter.increment(1);
+      numRecordsRead++;
+      return true;
+    } else {
+      boolean nextInputExists = moveToNextInput();
+      while (nextInputExists) {
+        if(readNextFromCurrentReader()) {
+          inputRecordCounter.increment(1);
+          numRecordsRead++;
+          return true;
+        }
+        nextInputExists = moveToNextInput();
+      }
+      LOG.info("Num Records read: " + numRecordsRead);
+      return false;
+    }
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return (Object) key;
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    return value;
+  }
+
+  /**
+   * Tries reading the next key and value from the current reader.
+   * @return true if the current reader has more records
+   * @throws IOException
+   */
+  private boolean readNextFromCurrentReader() throws IOException {
+    // Initial reader.
+    if (this.currentReader == null) {
+      return false;
+    } else {
+      boolean hasMore = this.currentReader.nextRawKey(keyIn);
+      if (hasMore) {
+        this.currentReader.nextRawValue(valIn);
+        this.key = keyDeserializer.deserialize(this.key);
+        this.value = valDeserializer.deserialize(this.value);
+        return true;
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Moves to the next available input. This method may block if the input is not ready yet.
+   * Also takes care of closing the previous input.
+   * 
+   * @return true if the next input exists, false otherwise
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean moveToNextInput() throws IOException {
+    if (currentReader != null) { // Close the current reader.
+      currentReader.close();
+      currentFetchedInput.free();
+    }
+    try {
+      currentFetchedInput = shuffleManager.getNextInput();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for next available input", e);
+      throw new IOException(e);
+    }
+    if (currentFetchedInput == null) {
+      return false; // No more inputs
+    } else {
+      currentReader = openIFileReader(currentFetchedInput);
+      return true;
+    }
+  }
+
+  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+      throws IOException {
+    if (fetchedInput.getType() == Type.MEMORY) {
+      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+          mfi.getBytes(), 0, (int) mfi.getActualSize());
+    } else {
+      return new IFile.Reader(fetchedInput.getInputStream(),
+          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
+          ifileReadAheadLength, ifileBufferSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index e69a955..adbeff8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -29,51 +29,68 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleInputEventHandler;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
 
 import com.google.common.base.Preconditions;
-public class ShuffledUnorderedKVInput implements LogicalInput {
+public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallback {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
   
   private Configuration conf;
   private int numInputs = -1;
-  private BroadcastShuffleManager shuffleManager;
+  private TezInputContext inputContext;
+  private ShuffleManager shuffleManager;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
   private volatile long firstEventReceivedTime = -1;
   @SuppressWarnings("rawtypes")
-  private BroadcastKVReader kvReader;
+  private ShuffledUnorderedKVReader kvReader;
   
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
   private TezCounter inputRecordCounter;
   
+  private SimpleFetchedInputAllocator inputManager;
+  private BroadcastShuffleInputEventHandler inputEventHandler;
+  
+  private volatile long initialMemoryAvailable = -1;
+  
   public ShuffledUnorderedKVInput() {
   }
 
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
+    this.inputContext = inputContext;
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
 
     if (numInputs == 0) {
       inputContext.requestInitialMemory(0l, null);
       isStarted.set(true);
       inputContext.inputIsReady();
       return Collections.emptyList();
+    } else {
+      long initalMemReq = getInitialMemoryReq();
+      this.inputContext.requestInitialMemory(initalMemReq, this);
     }
 
-    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+    this.inputRecordCounter = inputContext.getCounters().findCounter(
+        TaskCounter.INPUT_RECORDS_PROCESSED);
     return Collections.emptyList();
   }
 
@@ -81,8 +98,50 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   public void start() throws IOException {
     synchronized (this) {
       if (!isStarted.get()) {
+        ////// Initial configuration
+        Preconditions.checkState(initialMemoryAvailable != -1,
+            "Initial memory available must be configured before starting");
+        CompressionCodec codec;
+        if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+          Class<? extends CompressionCodec> codecClass = ConfigUtils
+              .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+          codec = ReflectionUtils.newInstance(codecClass, conf);
+        } else {
+          codec = null;
+        }
+        
+        boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+            TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+        int ifileReadAheadLength = 0;
+        int ifileBufferSize = 0;
+
+        if (ifileReadAhead) {
+          ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+              TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+        }
+        ifileBufferSize = conf.getInt("io.file.buffer.size",
+            TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+        
+        this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs);
+        
+        this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
+            inputContext.getTotalMemoryAvailableToTask());
+        inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
+        inputManager.configureAndStart();
+        
+        this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+            inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
+            ifileReadAheadLength);
+
+        this.shuffleManager.setCompressionCodec(codec);
+        this.shuffleManager.setIfileParameters(ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+        this.shuffleManager.setFetchedInputAllocator(inputManager);
+        this.shuffleManager.setInputEventHandler(inputEventHandler);        
+        ////// End of Initial configuration
+
         this.shuffleManager.run();
-        this.kvReader = this.shuffleManager.createReader(inputRecordCounter);
+        this.kvReader = createReader(inputRecordCounter, codec,
+            ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
         List<Event> pending = new LinkedList<Event>();
         pendingEvents.drainTo(pending);
         if (pending.size() > 0) {
@@ -129,6 +188,9 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
           if (firstEventReceivedTime == -1) {
             firstEventReceivedTime = System.currentTimeMillis();
           }
+          // This queue will keep growing if the Processor decides never to
+          // start the event. The Input, however has no idea, on whether start
+          // will be invoked or not.
           pendingEvents.addAll(inputEvents);
           return;
         }
@@ -139,7 +201,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
 
   @Override
   public List<Event> close() throws Exception {
-    if (numInputs != 0) {
+    if (this.shuffleManager != null) {
       this.shuffleManager.shutdown();
     }
     return null;
@@ -150,4 +212,23 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     this.numInputs = numInputs;
   }
 
-}
+  @Override
+  public void memoryAssigned(long assignedSize) {
+    this.initialMemoryAvailable = assignedSize;
+  }
+
+  private long getInitialMemoryReq() {
+    return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
+        inputContext.getTotalMemoryAvailableToTask());
+  }
+  
+  
+  @SuppressWarnings("rawtypes")
+  private ShuffledUnorderedKVReader createReader(TezCounter inputRecordCounter, CompressionCodec codec,
+      int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
+      throws IOException {
+    return new ShuffledUnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled,
+        ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
new file mode 100644
index 0000000..ee9979c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleEventHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tez.runtime.api.Event;
+
+public interface ShuffleEventHandler {
+  public void handleEvents(List<Event> events) throws IOException;
+}