You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [7/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Token<JobTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+          && service.equals(token.getService())) {
+        return (Token<JobTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+
+import javax.crypto.SecretKey;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SecureShuffleUtils {
+  public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+  public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+  
+  /**
+   * Base64 encoded hash of msg
+   * @param msg
+   */
+  public static String generateHash(byte[] msg, SecretKey key) {
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+  }
+  
+  /**
+   * calculate hash of msg
+   * @param msg
+   * @return
+   */
+  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+    return JobTokenSecretManager.computeHash(msg, key);
+  }
+  
+  /**
+   * verify that hash equals to HMacHash(msg)
+   * @param newHash
+   * @return true if is the same
+   */
+  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+    byte[] msg_hash = generateByteHash(msg, key);
+    return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+  }
+  
+  /**
+   * Aux util to calculate hash of a String
+   * @param enc_str
+   * @param key
+   * @return Base64 encodedHash
+   * @throws IOException
+   */
+  public static String hashFromString(String enc_str, SecretKey key) 
+  throws IOException {
+    return generateHash(enc_str.getBytes(), key); 
+  }
+  
+  /**
+   * verify that base64Hash is same as HMacHash(msg)  
+   * @param base64Hash (Base64 encoded hash)
+   * @param msg
+   * @throws IOException if not the same
+   */
+  public static void verifyReply(String base64Hash, String msg, SecretKey key)
+  throws IOException {
+    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+    
+    boolean res = verifyHash(hash, msg.getBytes(), key);
+    
+    if(res != true) {
+      throw new IOException("Verification of the hashReply failed");
+    }
+  }
+  
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param url
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(URL url) {
+    return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param request
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(HttpServletRequest request ) {
+    return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+        request.getLocalPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param uri_path
+   * @param uri_query
+   * @return string for encoding
+   */
+  private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+    return String.valueOf(port) + uri_path + "?" + uri_query;
+  }
+  
+  
+  /**
+   * byte array to Hex String
+   * @param ba
+   * @return string with HEX value of the key
+   */
+  public static String toHex(byte[] ba) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    for(byte b: ba) {
+      ps.printf("%x", b);
+    }
+    return baos.toString();
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.api.Master;
+import org.apache.tez.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+
+class EventFetcher extends Thread {
+  private static final long SLEEP_TIME = 1000;
+  private static final int MAX_RETRIES = 10;
+  private static final int RETRY_PERIOD = 5000;
+  private static final Log LOG = LogFactory.getLog(EventFetcher.class);
+
+  private final TezTaskAttemptID reduce;
+  private final Master umbilical;
+  private final ShuffleScheduler scheduler;
+  private int fromEventIdx = 0;
+  private int maxEventsToFetch;
+  private ExceptionReporter exceptionReporter = null;
+  
+  private int maxMapRuntime = 0;
+
+  private volatile boolean stopped = false;
+  
+  public EventFetcher(TezTaskAttemptID reduce,
+                      Master umbilical,
+                      ShuffleScheduler scheduler,
+                      ExceptionReporter reporter,
+                      int maxEventsToFetch) {
+    setName("EventFetcher for fetching Map Completion Events");
+    setDaemon(true);    
+    this.reduce = reduce;
+    this.umbilical = umbilical;
+    this.scheduler = scheduler;
+    exceptionReporter = reporter;
+    this.maxEventsToFetch = maxEventsToFetch;
+  }
+
+  @Override
+  public void run() {
+    int failures = 0;
+    LOG.info(reduce + " Thread started: " + getName());
+    
+    try {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          int numNewMaps = getMapCompletionEvents();
+          failures = 0;
+          if (numNewMaps > 0) {
+            LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
+          }
+          LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
+          if (!Thread.currentThread().isInterrupted()) {
+            Thread.sleep(SLEEP_TIME);
+          }
+        } catch (InterruptedException e) {
+          LOG.info("EventFetcher is interrupted.. Returning");
+          return;
+        } catch (IOException ie) {
+          LOG.info("Exception in getting events", ie);
+          // check to see whether to abort
+          if (++failures >= MAX_RETRIES) {
+            throw new IOException("too many failures downloading events", ie);
+          }
+          // sleep for a bit
+          if (!Thread.currentThread().isInterrupted()) {
+            Thread.sleep(RETRY_PERIOD);
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      return;
+    } catch (Throwable t) {
+      exceptionReporter.reportException(t);
+      return;
+    }
+  }
+
+  public void shutDown() {
+    this.stopped = true;
+    interrupt();
+    try {
+      join(5000);
+    } catch(InterruptedException ie) {
+      LOG.warn("Got interrupted while joining " + getName(), ie);
+    }
+  }
+  
+  /** 
+   * Queries the {@link TaskTracker} for a set of map-completion events 
+   * from a given event ID.
+   * @throws IOException
+   */  
+  protected int getMapCompletionEvents() throws IOException {
+    
+    int numNewMaps = 0;
+    TezDependentTaskCompletionEvent events[] = null;
+
+    do {
+      TezTaskDependencyCompletionEventsUpdate update =
+          umbilical.getDependentTasksCompletionEvents(
+              reduce.getJobID(),
+              fromEventIdx,
+              maxEventsToFetch,
+              reduce);
+      events = update.getDependentTaskCompletionEvents();
+      LOG.debug("Got " + events.length + " map completion events from " +
+               fromEventIdx);
+
+      // Check if the reset is required.
+      // Since there is no ordering of the task completion events at the
+      // reducer, the only option to sync with the new jobtracker is to reset
+      // the events index
+      if (update.shouldReset()) {
+        fromEventIdx = 0;
+        scheduler.resetKnownMaps();
+      }
+
+      // Update the last seen event ID
+      fromEventIdx += events.length;
+
+      // Process the TaskCompletionEvents:
+      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+      //    fetching from those maps.
+      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+      //    outputs at all.
+      for (TezDependentTaskCompletionEvent event : events) {
+        switch (event.getStatus()) {
+        case SUCCEEDED:
+          URI u = getBaseURI(event.getTaskTrackerHttp());
+          scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
+              u.toString(),
+              event.getTaskAttemptID());
+          numNewMaps ++;
+          int duration = event.getTaskRunTime();
+          if (duration > maxMapRuntime) {
+            maxMapRuntime = duration;
+            scheduler.informMaxMapRunTime(maxMapRuntime);
+          }
+          break;
+        case FAILED:
+        case KILLED:
+        case OBSOLETE:
+          scheduler.obsoleteMapOutput(event.getTaskAttemptID());
+          LOG.info("Ignoring obsolete output of " + event.getStatus() + 
+              " map-task: '" + event.getTaskAttemptID() + "'");
+          break;
+        case TIPFAILED:
+          scheduler.tipFailed(event.getTaskAttemptID().getTaskID());
+          LOG.info("Ignoring output of failed map TIP: '" +  
+              event.getTaskAttemptID() + "'");
+          break;
+        }
+      }
+    } while (events.length == maxEventsToFetch);
+
+    return numNewMaps;
+  }
+  
+  private URI getBaseURI(String url) {
+    StringBuffer baseUrl = new StringBuffer(url);
+    if (!url.endsWith("/")) {
+      baseUrl.append("/");
+    }
+    baseUrl.append("mapOutput?job=");
+    baseUrl.append(reduce.getJobID());
+    baseUrl.append("&reduce=");
+    baseUrl.append(reduce.getTaskID().getId());
+    baseUrl.append("&map=");
+    URI u = URI.create(baseUrl.toString());
+    return u;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+  void reportException(Throwable t);
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.IDUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.security.SecureShuffleUtils;
+import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.engine.common.sort.impl.IFileInputStream;
+import org.apache.tez.records.TezTaskAttemptID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class Fetcher extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(Fetcher.class);
+  
+  /** Basic/unit connection timeout (in milliseconds) */
+  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+  
+  private final Progressable reporter;
+  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+                                    CONNECTION, WRONG_REDUCE}
+  
+  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+  private final TezCounter connectionErrs;
+  private final TezCounter ioErrs;
+  private final TezCounter wrongLengthErrs;
+  private final TezCounter badIdErrs;
+  private final TezCounter wrongMapErrs;
+  private final TezCounter wrongReduceErrs;
+  private final MergeManager merger;
+  private final ShuffleScheduler scheduler;
+  private final ShuffleClientMetrics metrics;
+  private final ExceptionReporter exceptionReporter;
+  private final int id;
+  private static int nextId = 0;
+  private final int reduce;
+  
+  private final int connectionTimeout;
+  private final int readTimeout;
+  
+  // Decompression of map-outputs
+  private final CompressionCodec codec;
+  private final Decompressor decompressor;
+  private final SecretKey jobTokenSecret;
+
+  private volatile boolean stopped = false;
+
+  private Configuration job;
+
+  private static boolean sslShuffle;
+  private static SSLFactory sslFactory;
+
+  public Fetcher(Configuration job, TezTaskAttemptID reduceId, 
+      ShuffleScheduler scheduler, MergeManager merger,
+      TezTaskReporter reporter, ShuffleClientMetrics metrics,
+      ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+    this.job = job;
+    this.reporter = reporter;
+    this.scheduler = scheduler;
+    this.merger = merger;
+    this.metrics = metrics;
+    this.exceptionReporter = exceptionReporter;
+    this.id = ++nextId;
+    this.reduce = reduceId.getTaskID().getId();
+    this.jobTokenSecret = jobTokenSecret;
+    ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.IO_ERROR.toString());
+    wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_LENGTH.toString());
+    badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.BAD_ID.toString());
+    wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_MAP.toString());
+    connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.CONNECTION.toString());
+    wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_REDUCE.toString());
+
+    if (ConfigUtils.getCompressMapOutput(job)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
+      decompressor = CodecPool.getDecompressor(codec);
+    } else {
+      codec = null;
+      decompressor = null;
+    }
+
+    this.connectionTimeout = 
+        job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
+    this.readTimeout = 
+        job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+
+    setName("fetcher#" + id);
+    setDaemon(true);
+
+    synchronized (Fetcher.class) {
+      sslShuffle = job.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
+          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+      if (sslShuffle && sslFactory == null) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+        try {
+          sslFactory.init();
+        } catch (Exception ex) {
+          sslFactory.destroy();
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+  public void run() {
+    try {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        MapHost host = null;
+        try {
+          // If merge is on, block
+          merger.waitForInMemoryMerge();
+
+          // Get a host to shuffle from
+          host = scheduler.getHost();
+          metrics.threadBusy();
+
+          // Shuffle
+          copyFromHost(host);
+        } finally {
+          if (host != null) {
+            scheduler.freeHost(host);
+            metrics.threadFree();            
+          }
+        }
+      }
+    } catch (InterruptedException ie) {
+      return;
+    } catch (Throwable t) {
+      exceptionReporter.reportException(t);
+    }
+  }
+
+  public void shutDown() throws InterruptedException {
+    this.stopped = true;
+    interrupt();
+    try {
+      join(5000);
+    } catch (InterruptedException ie) {
+      LOG.warn("Got interrupt while joining " + getName(), ie);
+    }
+    if (sslFactory != null) {
+      sslFactory.destroy();
+    }
+  }
+
+  @VisibleForTesting
+  protected HttpURLConnection openConnection(URL url) throws IOException {
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    if (sslShuffle) {
+      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+      try {
+        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+    }
+    return conn;
+  }
+  
+  /**
+   * The crux of the matter...
+   * 
+   * @param host {@link MapHost} from which we need to  
+   *              shuffle available map-outputs.
+   */
+  @VisibleForTesting
+  protected void copyFromHost(MapHost host) throws IOException {
+    // Get completed maps on 'host'
+    List<TezTaskAttemptID> maps = scheduler.getMapsForHost(host);
+    
+    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
+    // especially at the tail of large jobs
+    if (maps.size() == 0) {
+      return;
+    }
+    
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+        + maps);
+    }
+    
+    // List of maps to be fetched yet
+    Set<TezTaskAttemptID> remaining = new HashSet<TezTaskAttemptID>(maps);
+    
+    // Construct the url and connect
+    DataInputStream input;
+    boolean connectSucceeded = false;
+    
+    try {
+      URL url = getMapOutputURL(host, maps);
+      HttpURLConnection connection = openConnection(url);
+      
+      // generate hash of the url
+      String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+      String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+      
+      // put url hash into http header
+      connection.addRequestProperty(
+          SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+      // set the read timeout
+      connection.setReadTimeout(readTimeout);
+      connect(connection, connectionTimeout);
+      connectSucceeded = true;
+      input = new DataInputStream(connection.getInputStream());
+
+      // Validate response code
+      int rc = connection.getResponseCode();
+      if (rc != HttpURLConnection.HTTP_OK) {
+        throw new IOException(
+            "Got invalid response code " + rc + " from " + url +
+            ": " + connection.getResponseMessage());
+      }
+      
+      // get the replyHash which is HMac of the encHash we sent to the server
+      String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+      if(replyHash==null) {
+        throw new IOException("security validation of TT Map output failed");
+      }
+      LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+      // verify that replyHash is HMac of encHash
+      SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+      LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+    } catch (IOException ie) {
+      ioErrs.increment(1);
+      LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
+               " map outputs", ie);
+
+      // If connect did not succeed, just mark all the maps as failed,
+      // indirectly penalizing the host
+      if (!connectSucceeded) {
+        for(TezTaskAttemptID left: remaining) {
+          scheduler.copyFailed(left, host, connectSucceeded);
+        }
+      } else {
+        // If we got a read error at this stage, it implies there was a problem
+        // with the first map, typically lost map. So, penalize only that map
+        // and add the rest
+        TezTaskAttemptID firstMap = maps.get(0);
+        scheduler.copyFailed(firstMap, host, connectSucceeded);
+      }
+      
+      // Add back all the remaining maps, WITHOUT marking them as failed
+      for(TezTaskAttemptID left: remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
+      
+      return;
+    }
+    
+    try {
+      // Loop through available map-outputs and fetch them
+      // On any error, faildTasks is not null and we exit
+      // after putting back the remaining maps to the 
+      // yet_to_be_fetched list and marking the failed tasks.
+      TezTaskAttemptID[] failedTasks = null;
+      while (!remaining.isEmpty() && failedTasks == null) {
+        failedTasks = copyMapOutput(host, input, remaining);
+      }
+      
+      if(failedTasks != null && failedTasks.length > 0) {
+        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        for(TezTaskAttemptID left: failedTasks) {
+          scheduler.copyFailed(left, host, true);
+        }
+      }
+      
+      IOUtils.cleanup(LOG, input);
+      
+      // Sanity check
+      if (failedTasks == null && !remaining.isEmpty()) {
+        throw new IOException("server didn't return all expected map outputs: "
+            + remaining.size() + " left.");
+      }
+    } finally {
+      for (TezTaskAttemptID left : remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
+    }
+  }
+  
+  private static TezTaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TezTaskAttemptID[0];
+  
+  private TezTaskAttemptID[] copyMapOutput(MapHost host,
+                                DataInputStream input,
+                                Set<TezTaskAttemptID> remaining) {
+    MapOutput mapOutput = null;
+    TezTaskAttemptID mapId = null;
+    long decompressedLength = -1;
+    long compressedLength = -1;
+    
+    try {
+      long startTime = System.currentTimeMillis();
+      int forReduce = -1;
+      //Read the shuffle header
+      try {
+        ShuffleHeader header = new ShuffleHeader();
+        header.readFields(input);
+        mapId = IDUtils.toTaskAttemptId(header.mapId);
+        compressedLength = header.compressedLength;
+        decompressedLength = header.uncompressedLength;
+        forReduce = header.forReduce;
+      } catch (IllegalArgumentException e) {
+        badIdErrs.increment(1);
+        LOG.warn("Invalid map id ", e);
+        //Don't know which one was bad, so consider all of them as bad
+        return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+      }
+
+ 
+      // Do some basic sanity verification
+      if (!verifySanity(compressedLength, decompressedLength, forReduce,
+          remaining, mapId)) {
+        return new TezTaskAttemptID[] {mapId};
+      }
+      
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+            ", decomp len: " + decompressedLength);
+      }
+      
+      // Get the location for the map output - either in-memory or on-disk
+      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      
+      // Check if we can shuffle *now* ...
+      if (mapOutput.getType() == Type.WAIT) {
+        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+        //Not an error but wait to process data.
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      } 
+      
+      // Go!
+      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
+               mapOutput.getMapId() + " decomp: " +
+               decompressedLength + " len: " + compressedLength + " to " +
+               mapOutput.getType());
+      if (mapOutput.getType() == Type.MEMORY) {
+        shuffleToMemory(host, mapOutput, input, 
+                        (int) decompressedLength, (int) compressedLength);
+      } else {
+        shuffleToDisk(host, mapOutput, input, compressedLength);
+      }
+      
+      // Inform the shuffle scheduler
+      long endTime = System.currentTimeMillis();
+      scheduler.copySucceeded(mapId, host, compressedLength, 
+                              endTime - startTime, mapOutput);
+      // Note successful shuffle
+      remaining.remove(mapId);
+      metrics.successFetch();
+      return null;
+    } catch (IOException ioe) {
+      ioErrs.increment(1);
+      if (mapId == null || mapOutput == null) {
+        LOG.info("fetcher#" + id + " failed to read map header" + 
+                 mapId + " decomp: " + 
+                 decompressedLength + ", " + compressedLength, ioe);
+        if(mapId == null) {
+          return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+        } else {
+          return new TezTaskAttemptID[] {mapId};
+        }
+      }
+      
+      LOG.warn("Failed to shuffle output of " + mapId + 
+               " from " + host.getHostName(), ioe); 
+
+      // Inform the shuffle-scheduler
+      mapOutput.abort();
+      metrics.failedFetch();
+      return new TezTaskAttemptID[] {mapId};
+    }
+
+  }
+  
+  /**
+   * Do some basic verification on the input received -- Being defensive
+   * @param compressedLength
+   * @param decompressedLength
+   * @param forReduce
+   * @param remaining
+   * @param mapId
+   * @return true/false, based on if the verification succeeded or not
+   */
+  private boolean verifySanity(long compressedLength, long decompressedLength,
+      int forReduce, Set<TezTaskAttemptID> remaining, TezTaskAttemptID mapId) {
+    if (compressedLength < 0 || decompressedLength < 0) {
+      wrongLengthErrs.increment(1);
+      LOG.warn(getName() + " invalid lengths in map output header: id: " +
+               mapId + " len: " + compressedLength + ", decomp len: " + 
+               decompressedLength);
+      return false;
+    }
+    
+    if (forReduce != reduce) {
+      wrongReduceErrs.increment(1);
+      LOG.warn(getName() + " data for the wrong reduce map: " +
+               mapId + " len: " + compressedLength + " decomp len: " +
+               decompressedLength + " for reduce " + forReduce);
+      return false;
+    }
+
+    // Sanity check
+    if (!remaining.contains(mapId)) {
+      wrongMapErrs.increment(1);
+      LOG.warn("Invalid map-output! Received output for " + mapId);
+      return false;
+    }
+    
+    return true;
+  }
+
+  /**
+   * Create the map-output-url. This will contain all the map ids
+   * separated by commas
+   * @param host
+   * @param maps
+   * @return
+   * @throws MalformedURLException
+   */
+  private URL getMapOutputURL(MapHost host, List<TezTaskAttemptID> maps
+                              )  throws MalformedURLException {
+    // Get the base url
+    StringBuffer url = new StringBuffer(host.getBaseUrl());
+    
+    boolean first = true;
+    for (TezTaskAttemptID mapId : maps) {
+      if (!first) {
+        url.append(",");
+      }
+      url.append(mapId);
+      first = false;
+    }
+   
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
+    }
+    return new URL(url.toString());
+  }
+  
+  /** 
+   * The connection establishment is attempted multiple times and is given up 
+   * only on the last failure. Instead of connecting with a timeout of 
+   * X, we try connecting with a timeout of x < X but multiple times. 
+   */
+  private void connect(URLConnection connection, int connectionTimeout)
+  throws IOException {
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout "
+                            + "[timeout = " + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+    }
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    while (true) {
+      try {
+        connection.connect();
+        break;
+      } catch (IOException ioe) {
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout == 0) {
+          throw ioe;
+        }
+
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+      }
+    }
+  }
+
+  private void shuffleToMemory(MapHost host, MapOutput mapOutput, 
+                               InputStream input, 
+                               int decompressedLength, 
+                               int compressedLength) throws IOException {    
+    IFileInputStream checksumIn = 
+      new IFileInputStream(input, compressedLength, job);
+
+    input = checksumIn;       
+  
+    // Are map-outputs compressed?
+    if (codec != null) {
+      decompressor.reset();
+      input = codec.createInputStream(input, decompressor);
+    }
+  
+    // Copy map-output into an in-memory buffer
+    byte[] shuffleData = mapOutput.getMemory();
+    
+    try {
+      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+      metrics.inputBytes(shuffleData.length);
+      reporter.progress();
+      LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
+               mapOutput.getMapId());
+    } catch (IOException ioe) {      
+      // Close the streams
+      IOUtils.cleanup(LOG, input);
+
+      // Re-throw
+      throw ioe;
+    }
+
+  }
+  
+  private void shuffleToDisk(MapHost host, MapOutput mapOutput, 
+                             InputStream input, 
+                             long compressedLength) 
+  throws IOException {
+    // Copy data to local-disk
+    OutputStream output = mapOutput.getDisk();
+    long bytesLeft = compressedLength;
+    try {
+      final int BYTES_TO_READ = 64 * 1024;
+      byte[] buf = new byte[BYTES_TO_READ];
+      while (bytesLeft > 0) {
+        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        if (n < 0) {
+          throw new IOException("read past end of stream reading " + 
+                                mapOutput.getMapId());
+        }
+        output.write(buf, 0, n);
+        bytesLeft -= n;
+        metrics.inputBytes(n);
+        reporter.progress();
+      }
+
+      LOG.info("Read " + (compressedLength - bytesLeft) + 
+               " bytes from map-output for " +
+               mapOutput.getMapId());
+
+      output.close();
+    } catch (IOException ioe) {
+      // Close the streams
+      IOUtils.cleanup(LOG, input, output);
+
+      // Re-throw
+      throw ioe;
+    }
+
+    // Sanity check
+    if (bytesLeft != 0) {
+      throw new IOException("Incomplete map output received for " +
+                            mapOutput.getMapId() + " from " +
+                            host.getHostName() + " (" + 
+                            bytesLeft + " bytes missing of " + 
+                            compressedLength + ")"
+      );
+    }
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.IFile.Reader;
+import org.apache.tez.records.TezTaskAttemptID;
+
+/**
+ * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryReader extends Reader {
+  private final TezTaskAttemptID taskAttemptId;
+  private final MergeManager merger;
+  DataInputBuffer memDataIn = new DataInputBuffer();
+  private int start;
+  private int length;
+  private int prevKeyPos;
+
+  public InMemoryReader(MergeManager merger, TezTaskAttemptID taskAttemptId,
+                        byte[] data, int start, int length)
+  throws IOException {
+    super(null, null, length - start, null, null);
+    this.merger = merger;
+    this.taskAttemptId = taskAttemptId;
+
+    buffer = data;
+    bufferSize = (int)fileLength;
+    memDataIn.reset(buffer, start, length);
+    this.start = start;
+    this.length = length;
+  }
+
+  @Override
+  public void reset(int offset) {
+    memDataIn.reset(buffer, start + offset, length);
+    bytesRead = offset;
+    eof = false;
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    // InMemoryReader does not initialize streams like Reader, so in.getPos()
+    // would not work. Instead, return the number of uncompressed bytes read,
+    // which will be correct since in-memory data is not compressed.
+    return bytesRead;
+  }
+  
+  @Override
+  public long getLength() { 
+    return fileLength;
+  }
+  
+  private void dumpOnError() {
+    File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+    System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
+                       " to " + dumpFile.getAbsolutePath());
+    try {
+      FileOutputStream fos = new FileOutputStream(dumpFile);
+      fos.write(buffer, 0, bufferSize);
+      fos.close();
+    } catch (IOException ioe) {
+      System.err.println("Failed to dump map-output of " + taskAttemptId);
+    }
+  }
+  
+  public KeyState readRawKey(DataInputBuffer key) throws IOException {
+    try {
+      if (!positionToNextRecord(memDataIn)) {
+        return KeyState.NO_KEY;
+      }
+      // Setup the key
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();      
+      if(currentKeyLength == IFile.RLE_MARKER) {
+        key.reset(data, prevKeyPos, prevKeyLength);
+        currentKeyLength = prevKeyLength;
+        return KeyState.SAME_KEY;
+      }      
+      key.reset(data, pos, currentKeyLength);
+      prevKeyPos = pos;
+      // Position for the next value
+      long skipped = memDataIn.skip(currentKeyLength);
+      if (skipped != currentKeyLength) {
+        throw new IOException("Rec# " + recNo + 
+            ": Failed to skip past key of length: " + 
+            currentKeyLength);
+      }
+
+      // Record the byte
+      bytesRead += currentKeyLength;
+      return KeyState.NEW_KEY;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+  
+  public void nextRawValue(DataInputBuffer value) throws IOException {
+    try {
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();
+      value.reset(data, pos, currentValueLength);
+
+      // Position for the next record
+      long skipped = memDataIn.skip(currentValueLength);
+      if (skipped != currentValueLength) {
+        throw new IOException("Rec# " + recNo + 
+            ": Failed to skip past value of length: " + 
+            currentValueLength);
+      }
+      // Record the byte
+      bytesRead += currentValueLength;
+
+      ++recNo;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+    
+  public void close() {
+    // Release
+    dataIn = null;
+    buffer = null;
+      // Inform the MergeManager
+    if (merger != null) {
+      merger.unreserve(bufferSize);
+    }
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.IFileOutputStream;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryWriter extends Writer {
+  private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
+  
+  private DataOutputStream out;
+  
+  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
+    super(null);
+    this.out = 
+      new DataOutputStream(new IFileOutputStream(arrayStream));
+  }
+  
+  public void append(Object key, Object value) throws IOException {
+    throw new UnsupportedOperationException
+    ("InMemoryWriter.append(K key, V value");
+  }
+  
+  public void append(DataInputBuffer key, DataInputBuffer value)
+  throws IOException {
+    int keyLength = key.getLength() - key.getPosition();
+    if (keyLength < 0) {
+      throw new IOException("Negative key-length not allowed: " + keyLength + 
+                            " for " + key);
+    }
+    
+    boolean sameKey = (key == IFile.REPEAT_KEY);
+    
+    int valueLength = value.getLength() - value.getPosition();
+    if (valueLength < 0) {
+      throw new IOException("Negative value-length not allowed: " + 
+                            valueLength + " for " + value);
+    }
+    
+    if(sameKey) {
+      WritableUtils.writeVInt(out, IFile.RLE_MARKER);
+      WritableUtils.writeVInt(out, valueLength);
+      out.write(value.getData(), value.getPosition(), valueLength);
+    } else {
+      LOG.info("XXX InMemWriter.append" + 
+          " key.data=" + key.getData() + 
+          " key.pos=" + key.getPosition() + 
+          " key.len=" +key.getLength() + 
+          " val.data=" + value.getData() + 
+          " val.pos=" + value.getPosition() + 
+          " val.len=" + value.getLength());
+      WritableUtils.writeVInt(out, keyLength);
+      WritableUtils.writeVInt(out, valueLength);
+      out.write(key.getData(), key.getPosition(), keyLength); 
+      out.write(value.getData(), value.getPosition(), valueLength);      
+    }
+     
+  }
+
+  public void close() throws IOException {
+    // Write EOF_MARKER for key/value length
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+    
+    // Close the stream 
+    out.close();
+    out = null;
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.records.TezTaskAttemptID;
+
+
+class MapHost {
+  
+  public static enum State {
+    IDLE,               // No map outputs available
+    BUSY,               // Map outputs are being fetched
+    PENDING,            // Known map outputs which need to be fetched
+    PENALIZED           // Host penalized due to shuffle failures
+  }
+  
+  private State state = State.IDLE;
+  private final String hostName;
+  private final String baseUrl;
+  private List<TezTaskAttemptID> maps = new ArrayList<TezTaskAttemptID>();
+  
+  public MapHost(String hostName, String baseUrl) {
+    this.hostName = hostName;
+    this.baseUrl = baseUrl;
+  }
+  
+  public State getState() {
+    return state;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public String getBaseUrl() {
+    return baseUrl;
+  }
+
+  public synchronized void addKnownMap(TezTaskAttemptID mapId) {
+    maps.add(mapId);
+    if (state == State.IDLE) {
+      state = State.PENDING;
+    }
+  }
+  
+  public synchronized List<TezTaskAttemptID> getAndClearKnownMaps() {
+    List<TezTaskAttemptID> currentKnownMaps = maps;
+    maps = new ArrayList<TezTaskAttemptID>();
+    return currentKnownMaps;
+  }
+  
+  public synchronized void markBusy() {
+    state = State.BUSY;
+  }
+  
+  public synchronized void markPenalized() {
+    state = State.PENALIZED;
+  }
+  
+  public synchronized int getNumKnownMapOutputs() {
+    return maps.size();
+  }
+
+  /**
+   * Called when the node is done with its penalty or done copying.
+   * @return the host's new state
+   */
+  public synchronized State markAvailable() {
+    if (maps.isEmpty()) {
+      state = State.IDLE;
+    } else {
+      state = State.PENDING;
+    }
+    return state;
+  }
+  
+  @Override
+  public String toString() {
+    return hostName;
+  }
+  
+  /**
+   * Mark the host as penalized
+   */
+  public synchronized void penalize() {
+    state = State.PENALIZED;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.records.TezTaskAttemptID;
+
+class MapOutput {
+  private static final Log LOG = LogFactory.getLog(MapOutput.class);
+  private static AtomicInteger ID = new AtomicInteger(0);
+  
+  public static enum Type {
+    WAIT,
+    MEMORY,
+    DISK
+  }
+  
+  private final int id;
+  
+  private final MergeManager merger;
+  private final TezTaskAttemptID mapId;
+  
+  private final long size;
+  
+  private final byte[] memory;
+  private BoundedByteArrayOutputStream byteStream;
+  
+  private final FileSystem localFS;
+  private final Path tmpOutputPath;
+  private final Path outputPath;
+  private final OutputStream disk; 
+  
+  private final Type type;
+  
+  private final boolean primaryMapOutput;
+  
+  MapOutput(TezTaskAttemptID mapId, MergeManager merger, long size, 
+            Configuration conf, LocalDirAllocator localDirAllocator,
+            int fetcher, boolean primaryMapOutput, 
+            TezTaskOutputFiles mapOutputFile)
+         throws IOException {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    this.merger = merger;
+
+    type = Type.DISK;
+
+    memory = null;
+    byteStream = null;
+
+    this.size = size;
+    
+    this.localFS = FileSystem.getLocal(conf);
+    outputPath =
+      mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
+
+    disk = localFS.create(tmpOutputPath);
+    
+    this.primaryMapOutput = primaryMapOutput;
+  }
+  
+  MapOutput(TezTaskAttemptID mapId, MergeManager merger, int size, 
+            boolean primaryMapOutput) {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    this.merger = merger;
+
+    type = Type.MEMORY;
+    byteStream = new BoundedByteArrayOutputStream(size);
+    memory = byteStream.getBuffer();
+
+    this.size = size;
+    
+    localFS = null;
+    disk = null;
+    outputPath = null;
+    tmpOutputPath = null;
+    
+    this.primaryMapOutput = primaryMapOutput;
+  }
+
+  public MapOutput(TezTaskAttemptID mapId) {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    
+    type = Type.WAIT;
+    merger = null;
+    memory = null;
+    byteStream = null;
+    
+    size = -1;
+    
+    localFS = null;
+    disk = null;
+    outputPath = null;
+    tmpOutputPath = null;
+
+    this.primaryMapOutput = false;
+}
+  
+  public boolean isPrimaryMapOutput() {
+    return primaryMapOutput;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MapOutput) {
+      return id == ((MapOutput)obj).id;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id;
+  }
+
+  public Path getOutputPath() {
+    return outputPath;
+  }
+
+  public byte[] getMemory() {
+    return memory;
+  }
+
+  public BoundedByteArrayOutputStream getArrayStream() {
+    return byteStream;
+  }
+  
+  public OutputStream getDisk() {
+    return disk;
+  }
+
+  public TezTaskAttemptID getMapId() {
+    return mapId;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  public long getSize() {
+    return size;
+  }
+
+  public void commit() throws IOException {
+    if (type == Type.MEMORY) {
+      merger.closeInMemoryFile(this);
+    } else if (type == Type.DISK) {
+      localFS.rename(tmpOutputPath, outputPath);
+      merger.closeOnDiskFile(outputPath);
+    } else {
+      throw new IOException("Cannot commit MapOutput of type WAIT!");
+    }
+  }
+  
+  public void abort() {
+    if (type == Type.MEMORY) {
+      merger.unreserve(memory.length);
+    } else if (type == Type.DISK) {
+      try {
+        localFS.delete(tmpOutputPath, false);
+      } catch (IOException ie) {
+        LOG.info("failure to clean up " + tmpOutputPath, ie);
+      }
+    } else {
+      throw new IllegalArgumentException
+                   ("Cannot commit MapOutput with of type WAIT!");
+    }
+  }
+  
+  public String toString() {
+    return "MapOutput(" + mapId + ", " + type + ")";
+  }
+  
+  public static class MapOutputComparator 
+  implements Comparator<MapOutput> {
+    public int compare(MapOutput o1, MapOutput o2) {
+      if (o1.id == o2.id) { 
+        return 0;
+      }
+      
+      if (o1.size < o2.size) {
+        return -1;
+      } else if (o1.size > o2.size) {
+        return 1;
+      }
+      
+      if (o1.id < o2.id) {
+        return -1;
+      } else {
+        return 1;
+      
+      }
+    }
+  }
+  
+}