You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:23 UTC

[14/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
deleted file mode 100644
index a6d1c5b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.shuffle.server;
-
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.common.security.SecureShuffleUtils;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedStream;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class ShuffleHandler extends AuxiliaryService {
-
-  private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
-  
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
-  private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
-  private int sslFileBufferSize;
-
-  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
-      "mapreduce.shuffle";
-
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static final JobTokenSecretManager secretManager =
-    new JobTokenSecretManager();
-  private SecretKey tokenSecret;
-
-  public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
-  public static final int DEFAULT_SHUFFLE_PORT = 8080;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "mapreduce.shuffle.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
-  private ExternalSorter sorter;
-  
-  @Metrics(about="Shuffle output metrics", context="mapred")
-  static class ShuffleMetrics implements ChannelFutureListener {
-    @Metric("Shuffle output in bytes")
-        MutableCounterLong shuffleOutputBytes;
-    @Metric("# of failed shuffle outputs")
-        MutableCounterInt shuffleOutputsFailed;
-    @Metric("# of succeeeded shuffle outputs")
-        MutableCounterInt shuffleOutputsOK;
-    @Metric("# of current shuffle connections")
-        MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
-
-  final ShuffleMetrics metrics;
-
-  ShuffleHandler(MetricsSystem ms) {
-    super("httpshuffle");
-    metrics = ms.register(new ShuffleMetrics());
-  }
-
-  public ShuffleHandler(ExternalSorter sorter) {
-    this(DefaultMetricsSystem.instance());
-    this.sorter = sorter;
-  }
-
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by ShuffleHandler.
-   * @param meta the metadata returned by the ShuffleHandler
-   * @return the port the Shuffle Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    int port = in.readInt();
-    return port;
-  }
-
-  /**
-   * A helper function to serialize the JobTokenIdentifier to be sent to the
-   * ShuffleHandler as ServiceData.
-   * @param jobToken the job token to be used for authentication of
-   * shuffle data requests.
-   * @return the serialized version of the jobToken.
-   */
-  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-    jobToken.write(jobToken_dob);
-    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
-  }
-
-  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(secret);
-    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-    jt.readFields(in);
-    return jt;
-  }
-
-  
-  @Override
-  public void initializeApplication(
-      ApplicationInitializationContext initAppContext) {
-    // TODO these bytes should be versioned
-    try {
-      String user = initAppContext.getUser();
-      ApplicationId appId = initAppContext.getApplicationId();
-      ByteBuffer secret = initAppContext.getApplicationDataForService();
-      Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
-      // TODO: Once SHuffle is out of NM, this can use MR APIs
-      userRsrc.put(appId.toString(), user);
-      LOG.info("Added token for " + appId.toString());
-      secretManager.addTokenForJob(appId.toString(), jt);
-    } catch (IOException e) {
-      LOG.error("Error during initApp", e);
-      // TODO add API to AuxiliaryServices to report failures
-    }
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext context) {
-    ApplicationId appId = context.getApplicationId();
-    secretManager.removeTokenForJob(appId.toString());
-    userRsrc.remove(appId.toString());
-  }
-
-  public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
-    this.init(new Configuration(conf));
-    tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
-  }
-
-  @Override
-  public synchronized void serviceInit(Configuration conf) {
-    ThreadFactory bossFactory = new ThreadFactoryBuilder()
-      .setNameFormat("ShuffleHandler Netty Boss #%d")
-      .build();
-    ThreadFactory workerFactory = new ThreadFactoryBuilder()
-      .setNameFormat("ShuffleHandler Netty Worker #%d")
-      .build();
-    
-    selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory));    
-  }
-
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public synchronized void serviceStart() {
-    Configuration conf = getConfig();
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
-    // Let OS pick the port
-    Channel ch = bootstrap.bind(new InetSocketAddress(0));
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
-    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
-    LOG.info(getName() + " listening on port " + port);
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-  }
-
-  @Override
-  public synchronized void serviceStop() {
-    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    bootstrap.releaseExternalResources();
-    pipelineFact.destroy();
-  }
-
-  @Override
-  public synchronized ByteBuffer getMetaData() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final Shuffle SHUFFLE;
-    private SSLFactory sslFactory;
-
-    public HttpPipelineFactory(Configuration conf) throws Exception {
-      SHUFFLE = new Shuffle(conf);
-      if (conf.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
-              TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL)) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", SHUFFLE);
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-
-  }
-
-  class Shuffle extends SimpleChannelUpstreamHandler {
-
-    private final Configuration conf;
-    private int port;
-
-    public Shuffle(Configuration conf) {
-      this.conf = conf;
-      this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
-        throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
-      }
-      // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
-        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
-      }
-      final Map<String,List<String>> q =
-        new QueryStringDecoder(request.getUri()).getParameters();
-      final List<String> mapIds = splitMaps(q.get("map"));
-      final List<String> reduceQ = q.get("reduce");
-      final List<String> jobQ = q.get("job");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RECV: " + request.getUri() +
-            "\n  mapId: " + mapIds +
-            "\n  reduceId: " + reduceQ +
-            "\n  jobId: " + jobQ);
-      }
-
-      if (mapIds == null || reduceQ == null || jobQ == null) {
-        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
-        return;
-      }
-      if (reduceQ.size() != 1 || jobQ.size() != 1) {
-        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
-        return;
-      }
-      int reduceId;
-      String jobId;
-      try {
-        reduceId = Integer.parseInt(reduceQ.get(0));
-        jobId = jobQ.get(0);
-      } catch (NumberFormatException e) {
-        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
-        return;
-      } catch (IllegalArgumentException e) {
-        sendError(ctx, "Bad job parameter", BAD_REQUEST);
-        return;
-      }
-
-      final String reqUri = request.getUri();
-      if (null == reqUri) {
-        // TODO? add upstream?
-        sendError(ctx, FORBIDDEN);
-        return;
-      }
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      try {
-        verifyRequest(jobId, ctx, request, response,
-            new URL("http", "", this.port, reqUri));
-      } catch (IOException e) {
-        LOG.warn("Shuffle failure ", e);
-        sendError(ctx, e.getMessage(), UNAUTHORIZED);
-        return;
-      }
-
-      Channel ch = evt.getChannel();
-      ch.write(response);
-      // TODO refactor the following into the pipeline
-      ChannelFuture lastMap = null;
-      for (String mapId : mapIds) {
-        try {
-          // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
-          
-          // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
-//          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
-//            String errorMessage =
-//                "Illegal shuffle request mapId: " + mapId
-//                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
-//            LOG.warn(errorMessage);
-//            sendError(ctx, errorMessage, BAD_REQUEST);
-//            return;
-//          }
-
-          lastMap =
-            sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
-          if (null == lastMap) {
-            sendError(ctx, NOT_FOUND);
-            return;
-          }
-        } catch (IOException e) {
-          LOG.error("Shuffle error ", e);
-          sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
-          return;
-        }
-      }
-      lastMap.addListener(metrics);
-      lastMap.addListener(ChannelFutureListener.CLOSE);
-    }
-
-    private void verifyRequest(String appid, ChannelHandlerContext ctx,
-        HttpRequest request, HttpResponse response, URL requestUri)
-        throws IOException {
-      if (null == tokenSecret) {
-        LOG.info("Request for unknown token " + appid);
-        throw new IOException("could not find jobid");
-      }
-      // string to encrypt
-      String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
-      // hash from the fetcher
-      String urlHashStr =
-        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
-      if (urlHashStr == null) {
-        LOG.info("Missing header hash for " + appid);
-        throw new IOException("fetcher cannot be authenticated");
-      }
-      if (LOG.isDebugEnabled()) {
-        int len = urlHashStr.length();
-        LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
-            urlHashStr.substring(len-len/2, len-1));
-      }
-      // verify - throws exception
-      SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
-      // verification passed - encode the reply
-      String reply =
-        SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
-      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
-      addVersionToHeader(response);
-      if (LOG.isDebugEnabled()) {
-        int len = reply.length();
-        LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
-            reply.substring(len-len/2, len-1));
-      }
-    }
-
-    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
-        String user, String jobId, String mapId, int reduce)
-        throws IOException {
-      final ShuffleHeader header = sorter.getShuffleHeader(reduce);
-      final DataOutputBuffer dob = new DataOutputBuffer();
-      header.write(dob);
-      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-
-      ChannelFuture writeFuture =
-          ch.write(
-              new ChunkedStream(
-                  sorter.getSortedStream(reduce), sslFileBufferSize
-                  )
-              );
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(header.getCompressedLength()); // optimistic
-      return writeFuture;
-    }
-
-    private void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    private void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      addVersionToHeader(response);
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-      // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-    }
-    
-    private void addVersionToHeader(HttpResponse response) {
-      // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);      
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
-      if (cause instanceof TooLongFrameException) {
-        sendError(ctx, BAD_REQUEST);
-        return;
-      }
-
-      LOG.error("Shuffle error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("Shuffle error " + e);
-        sendError(ctx, INTERNAL_SERVER_ERROR);
-      }
-    }
-
-  }
-
-  public int getPort() {
-    return port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
deleted file mode 100644
index 5aa0ddf..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.QuickSort;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.hadoop.compat.NullProgressable;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter {
-
-  private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
-
-  public abstract void close() throws IOException;
-
-  public abstract void flush() throws IOException;
-
-  public abstract void write(Object key, Object value) throws IOException;
-
-  protected Progressable nullProgressable = new NullProgressable();
-  protected TezOutputContext outputContext;
-  protected Combiner combiner;
-  protected Partitioner partitioner;
-  protected Configuration conf;
-  protected FileSystem rfs;
-  protected TezTaskOutput mapOutputFile;
-  protected int partitions;
-  protected Class keyClass;
-  protected Class valClass;
-  protected RawComparator comparator;
-  protected SerializationFactory serializationFactory;
-  protected Serializer keySerializer;
-  protected Serializer valSerializer;
-
-  protected IndexedSorter sorter;
-
-  // Compression for map-outputs
-  protected CompressionCodec codec;
-
-  // Counters
-  // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
-  protected TezCounter mapOutputByteCounter;
-  protected TezCounter mapOutputRecordCounter;
-  protected TezCounter fileOutputByteCounter;
-  protected TezCounter spilledRecordsCounter;
-
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
-    this.outputContext = outputContext;
-    this.conf = conf;
-    this.partitions = numOutputs;
-
-    rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
-
-    // sorter
-    sorter = ReflectionUtils.newInstance(this.conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
-        IndexedSorter.class), this.conf);
-
-    comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
-
-    // k/v serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
-    serializationFactory = new SerializationFactory(this.conf);
-    keySerializer = serializationFactory.getSerializer(keyClass);
-    valSerializer = serializationFactory.getSerializer(valClass);
-
-    //    counters
-    mapOutputByteCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
-    mapOutputRecordCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
-    spilledRecordsCounter =
-        outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-    // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-    } else {
-      codec = null;
-    }
-
-    // Task outputs
-    mapOutputFile = TezEngineUtils.instantiateTaskOutputManager(conf, outputContext);
-    
-    LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
-    this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
-    this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
-    this.combiner = TezEngineUtils.instantiateCombiner(this.conf, outputContext);
-  }
-
-  /**
-   * Exception indicating that the allocated sort buffer is insufficient to hold
-   * the current record.
-   */
-  @SuppressWarnings("serial")
-  public static class MapBufferTooSmallException extends IOException {
-    public MapBufferTooSmallException(String s) {
-      super(s);
-    }
-  }
-
-  @Private
-  public TezTaskOutput getMapOutput() {
-    return mapOutputFile;
-  }
-
-  protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
-      Writer writer) throws IOException {
-    try {
-      combiner.combine(kvIter, writer);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Rename srcPath to dstPath on the same volume. This is the same as
-   * RawLocalFileSystem's rename method, except that it will not fall back to a
-   * copy, and it will create the target directory if it doesn't exist.
-   */
-  protected void sameVolRename(Path srcPath, Path dstPath) throws IOException {
-    RawLocalFileSystem rfs = (RawLocalFileSystem) this.rfs;
-    File src = rfs.pathToFile(srcPath);
-    File dst = rfs.pathToFile(dstPath);
-    if (!dst.getParentFile().exists()) {
-      if (!dst.getParentFile().mkdirs()) {
-        throw new IOException("Unable to rename " + src + " to " + dst
-            + ": couldn't create parent directory");
-      }
-    }
-
-    if (!src.renameTo(dst)) {
-      throw new IOException("Unable to rename " + src + " to " + dst);
-    }
-  }
-
-  public InputStream getSortedStream(int partition) {
-    throw new UnsupportedOperationException("getSortedStream isn't supported!");
-  }
-
-  public ShuffleHeader getShuffleHeader(int reduce) {
-    throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
deleted file mode 100644
index 7d10606..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.BufferUtils;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.tez.common.counters.TezCounter;
-
-/**
- * <code>IFile</code> is the simple <key-len, value-len, key, value> format
- * for the intermediate map-outputs in Map-Reduce.
- *
- * There is a <code>Writer</code> to write out map-outputs in this format and 
- * a <code>Reader</code> to read files of this format.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFile {
-  private static final Log LOG = LogFactory.getLog(IFile.class);
-  public static final int EOF_MARKER = -1; // End of File Marker
-  public static final int RLE_MARKER = -2; // Repeat same key marker
-  public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
-    
-  /**
-   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
-   */
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static class Writer {
-    FSDataOutputStream out;
-    boolean ownOutputStream = false;
-    long start = 0;
-    FSDataOutputStream rawOut;
-    AtomicBoolean closed = new AtomicBoolean(false);
-    
-    CompressionOutputStream compressedOut;
-    Compressor compressor;
-    boolean compressOutput = false;
-    
-    long decompressedBytesWritten = 0;
-    long compressedBytesWritten = 0;
-
-    // Count records written to disk
-    private long numRecordsWritten = 0;
-    private final TezCounter writtenRecordsCounter;
-
-    IFileOutputStream checksumOut;
-
-    Class keyClass;
-    Class valueClass;
-    Serializer keySerializer;
-    Serializer valueSerializer;
-    
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    DataOutputBuffer previous = new DataOutputBuffer();
-    
-    // de-dup keys or not
-    private boolean rle = false;
-
-    public Writer(Configuration conf, FileSystem fs, Path file, 
-                  Class keyClass, Class valueClass,
-                  CompressionCodec codec,
-                  TezCounter writesCounter) throws IOException {
-      this(conf, fs.create(file), keyClass, valueClass, codec,
-           writesCounter);
-      ownOutputStream = true;
-    }
-    
-    protected Writer(TezCounter writesCounter) {
-      writtenRecordsCounter = writesCounter;
-    }
-
-    public Writer(Configuration conf, FSDataOutputStream out, 
-        Class keyClass, Class valueClass,
-        CompressionCodec codec, TezCounter writesCounter)
-        throws IOException {
-      this.writtenRecordsCounter = writesCounter;
-      this.checksumOut = new IFileOutputStream(out);
-      this.rawOut = out;
-      this.start = this.rawOut.getPos();
-      if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
-        if (this.compressor != null) {
-          this.compressor.reset();
-          this.compressedOut = codec.createOutputStream(checksumOut, compressor);
-          this.out = new FSDataOutputStream(this.compressedOut,  null);
-          this.compressOutput = true;
-        } else {
-          LOG.warn("Could not obtain compressor from CodecPool");
-          this.out = new FSDataOutputStream(checksumOut,null);
-        }
-      } else {
-        this.out = new FSDataOutputStream(checksumOut,null);
-      }
-      
-      this.keyClass = keyClass;
-      this.valueClass = valueClass;
-
-      if (keyClass != null) {
-        SerializationFactory serializationFactory = 
-          new SerializationFactory(conf);
-        this.keySerializer = serializationFactory.getSerializer(keyClass);
-        this.keySerializer.open(buffer);
-        this.valueSerializer = serializationFactory.getSerializer(valueClass);
-        this.valueSerializer.open(buffer);
-      }
-    }
-
-    public Writer(Configuration conf, FileSystem fs, Path file) 
-    throws IOException {
-      this(conf, fs, file, null, null, null, null);
-    }
-
-    public void close() throws IOException {
-      if (closed.getAndSet(true)) {
-        throw new IOException("Writer was already closed earlier");
-      }
-
-      // When IFile writer is created by BackupStore, we do not have
-      // Key and Value classes set. So, check before closing the
-      // serializers
-      if (keyClass != null) {
-        keySerializer.close();
-        valueSerializer.close();
-      }
-
-      // Write EOF_MARKER for key/value length
-      WritableUtils.writeVInt(out, EOF_MARKER);
-      WritableUtils.writeVInt(out, EOF_MARKER);
-      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
-      
-      //Flush the stream
-      out.flush();
-  
-      if (compressOutput) {
-        // Flush
-        compressedOut.finish();
-        compressedOut.resetState();
-      }
-      
-      // Close the underlying stream iff we own it...
-      if (ownOutputStream) {
-        out.close();
-      }
-      else {
-        // Write the checksum
-        checksumOut.finish();
-      }
-
-      compressedBytesWritten = rawOut.getPos() - start;
-
-      if (compressOutput) {
-        // Return back the compressor
-        CodecPool.returnCompressor(compressor);
-        compressor = null;
-      }
-
-      out = null;
-      if(writtenRecordsCounter != null) {
-        writtenRecordsCounter.increment(numRecordsWritten);
-      }
-    }
-
-    public void append(Object key, Object value) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+ key.getClass()
-                              +" is not "+ keyClass);
-      if (value.getClass() != valueClass)
-        throw new IOException("wrong value class: "+ value.getClass()
-                              +" is not "+ valueClass);
-      
-      boolean sameKey = false;
-
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0) {
-        throw new IOException("Negative key-length not allowed: " + keyLength + 
-                              " for " + key);
-      }     
-      
-      if(keyLength == previous.getLength()) {
-        sameKey = (BufferUtils.compare(previous, buffer) == 0);       
-      }
-      
-      if(!sameKey) {
-        BufferUtils.copy(buffer, previous);
-      }
-
-      // Append the 'value'
-      valueSerializer.serialize(value);
-      int valueLength = buffer.getLength() - keyLength;
-      if (valueLength < 0) {
-        throw new IOException("Negative value-length not allowed: " + 
-                              valueLength + " for " + value);
-      }
-      
-      if(sameKey) {        
-        WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
-        WritableUtils.writeVInt(out, valueLength);                  // value length
-        out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
-        // Update bytes written
-        decompressedBytesWritten += 0 + valueLength + 
-                                    WritableUtils.getVIntSize(RLE_MARKER) + 
-                                    WritableUtils.getVIntSize(valueLength);
-      } else {        
-        // Write the record out        
-        WritableUtils.writeVInt(out, keyLength);                  // key length
-        WritableUtils.writeVInt(out, valueLength);                // value length
-        out.write(buffer.getData(), 0, buffer.getLength());       // data
-        // Update bytes written
-        decompressedBytesWritten += keyLength + valueLength + 
-                                    WritableUtils.getVIntSize(keyLength) + 
-                                    WritableUtils.getVIntSize(valueLength);
-      }
-
-      // Reset
-      buffer.reset();
-      
-      
-      ++numRecordsWritten;
-    }
-    
-    public void append(DataInputBuffer key, DataInputBuffer value)
-    throws IOException {
-      int keyLength = key.getLength() - key.getPosition();
-      if (keyLength < 0) {
-        throw new IOException("Negative key-length not allowed: " + keyLength + 
-                              " for " + key);
-      }
-      
-      int valueLength = value.getLength() - value.getPosition();
-      if (valueLength < 0) {
-        throw new IOException("Negative value-length not allowed: " + 
-                              valueLength + " for " + value);
-      }
-      
-      boolean sameKey = false;
-      
-      if(rle && keyLength == previous.getLength()) {
-        sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);        
-      }
-      
-      if(rle && sameKey) {
-        WritableUtils.writeVInt(out, RLE_MARKER);
-        WritableUtils.writeVInt(out, valueLength);        
-        out.write(value.getData(), value.getPosition(), valueLength);
-
-        // Update bytes written
-        decompressedBytesWritten += 0 + valueLength
-            + WritableUtils.getVIntSize(RLE_MARKER)
-            + WritableUtils.getVIntSize(valueLength);
-      } else {
-        WritableUtils.writeVInt(out, keyLength);
-        WritableUtils.writeVInt(out, valueLength);
-        out.write(key.getData(), key.getPosition(), keyLength);
-        out.write(value.getData(), value.getPosition(), valueLength);
-
-        // Update bytes written
-        decompressedBytesWritten += keyLength + valueLength
-            + WritableUtils.getVIntSize(keyLength)
-            + WritableUtils.getVIntSize(valueLength);
-                
-        BufferUtils.copy(key, previous);        
-      }
-      ++numRecordsWritten;
-    }
-    
-    // Required for mark/reset
-    public DataOutputStream getOutputStream () {
-      return out;
-    }
-    
-    // Required for mark/reset
-    public void updateCountersForExternalAppend(long length) {
-      ++numRecordsWritten;
-      decompressedBytesWritten += length;
-    }
-    
-    public long getRawLength() {
-      return decompressedBytesWritten;
-    }
-    
-    public long getCompressedLength() {
-      return compressedBytesWritten;
-    }
-    
-    public void setRLE(boolean rle) {
-      this.rle = rle;
-      previous.reset();
-    }
-
-  }
-
-  /**
-   * <code>IFile.Reader</code> to read intermediate map-outputs. 
-   */
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class Reader {
-    
-    public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
-    
-    private static final int DEFAULT_BUFFER_SIZE = 128*1024;
-
-    // Count records read from disk
-    private long numRecordsRead = 0;
-    private final TezCounter readRecordsCounter;
-
-    final InputStream in;        // Possibly decompressed stream that we read
-    Decompressor decompressor;
-    public long bytesRead = 0;
-    protected final long fileLength;
-    protected boolean eof = false;
-    final IFileInputStream checksumIn;
-    
-    protected byte[] buffer = null;
-    protected int bufferSize = DEFAULT_BUFFER_SIZE;
-    protected DataInputStream dataIn;
-
-    protected int recNo = 1;
-    protected int prevKeyLength;
-    protected int currentKeyLength;
-    protected int currentValueLength;
-    byte keyBytes[] = new byte[0];
-    
-    
-    /**
-     * Construct an IFile Reader.
-     * 
-     * @param conf Configuration File 
-     * @param fs  FileSystem
-     * @param file Path of the file to be opened. This file should have
-     *             checksum bytes for the data at the end of the file.
-     * @param codec codec
-     * @param readsCounter Counter for records read from disk
-     * @throws IOException
-     */
-    public Reader(Configuration conf, FileSystem fs, Path file,
-                  CompressionCodec codec,
-                  TezCounter readsCounter) throws IOException {
-      this(conf, fs.open(file), 
-           fs.getFileStatus(file).getLen(),
-           codec, readsCounter);
-    }
-
-    /**
-     * Construct an IFile Reader.
-     * 
-     * @param conf Configuration File 
-     * @param in   The input stream
-     * @param length Length of the data in the stream, including the checksum
-     *               bytes.
-     * @param codec codec
-     * @param readsCounter Counter for records read from disk
-     * @throws IOException
-     */
-    public Reader(Configuration conf, InputStream in, long length, 
-                  CompressionCodec codec,
-                  TezCounter readsCounter) throws IOException {
-      readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length, conf);
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-        if (decompressor != null) {
-          this.in = codec.createInputStream(checksumIn, decompressor);
-        } else {
-          LOG.warn("Could not obtain decompressor from CodecPool");
-          this.in = checksumIn;
-        }
-      } else {
-        this.in = checksumIn;
-      }
-      this.dataIn = new DataInputStream(this.in);
-      this.fileLength = length;
-      
-      if (conf != null) {
-        bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
-      }
-    }
-    
-    public long getLength() { 
-      return fileLength - checksumIn.getSize();
-    }
-    
-    public long getPosition() throws IOException {    
-      return checksumIn.getPosition(); 
-    }
-    
-    /**
-     * Read upto len bytes into buf starting at offset off.
-     * 
-     * @param buf buffer 
-     * @param off offset
-     * @param len length of buffer
-     * @return the no. of bytes read
-     * @throws IOException
-     */
-    private int readData(byte[] buf, int off, int len) throws IOException {
-      int bytesRead = 0;
-      while (bytesRead < len) {
-        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
-            len - bytesRead);
-        if (n < 0) {
-          return bytesRead;
-        }
-        bytesRead += n;
-      }
-      return len;
-    }
-    
-    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
-      // Sanity check
-      if (eof) {
-        throw new EOFException("Completed reading " + bytesRead);
-      }
-      
-      // Read key and value lengths
-      prevKeyLength = currentKeyLength;
-      currentKeyLength = WritableUtils.readVInt(dIn);
-      currentValueLength = WritableUtils.readVInt(dIn);
-      bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
-                   WritableUtils.getVIntSize(currentValueLength);
-      
-      // Check for EOF
-      if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
-        eof = true;
-        return false;
-      }      
-      
-      // Sanity check
-      if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
-                              currentKeyLength);
-      }
-      if (currentValueLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
-                              currentValueLength);
-      }
-            
-      return true;
-    }
-    
-    public boolean nextRawKey(DataInputBuffer key) throws IOException {
-      return readRawKey(key) != KeyState.NO_KEY;
-    }
-    
-    public KeyState readRawKey(DataInputBuffer key) throws IOException {
-      if (!positionToNextRecord(dataIn)) {
-        return KeyState.NO_KEY;
-      }
-      if(currentKeyLength == RLE_MARKER) {
-        currentKeyLength = prevKeyLength;
-        // no data to read
-        key.reset(keyBytes, currentKeyLength);
-        return KeyState.SAME_KEY;
-      }
-      if (keyBytes.length < currentKeyLength) {
-        keyBytes = new byte[currentKeyLength << 1];
-      }
-      int i = readData(keyBytes, 0, currentKeyLength);
-      if (i != currentKeyLength) {
-        throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
-      }
-      key.reset(keyBytes, currentKeyLength);
-      bytesRead += currentKeyLength;
-      return KeyState.NEW_KEY;
-    }
-    
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      final byte[] valBytes = 
-        ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
-        ? new byte[currentValueLength << 1]
-        : value.getData();
-      int i = readData(valBytes, 0, currentValueLength);
-      if (i != currentValueLength) {
-        throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
-      }
-      value.reset(valBytes, currentValueLength);
-      
-      // Record the bytes read
-      bytesRead += currentValueLength;
-
-      ++recNo;
-      ++numRecordsRead;
-    }
-    
-    public void close() throws IOException {
-      // Close the underlying stream
-      in.close();
-      
-      // Release the buffer
-      dataIn = null;
-      buffer = null;
-      if(readRecordsCounter != null) {
-        readRecordsCounter.increment(numRecordsRead);
-      }
-
-      // Return the decompressor
-      if (decompressor != null) {
-        decompressor.reset();
-        CodecPool.returnDecompressor(decompressor);
-        decompressor = null;
-      }
-    }
-    
-    public void reset(int offset) {
-      return;
-    }
-
-    public void disableChecksumValidation() {
-      checksumIn.disableChecksumValidation();
-    }
-
-  }    
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
deleted file mode 100644
index dfb69f1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.HasFileDescriptor;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezJobConfig;
-/**
- * A checksum input stream, used for IFiles.
- * Used to validate the checksum of files created by {@link IFileOutputStream}. 
-*/
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFileInputStream extends InputStream {
-  
-  private final InputStream in; //The input stream to be verified for checksum.
-  private final FileDescriptor inFd; // the file descriptor, if it is known
-  private final long length; //The total length of the input file
-  private final long dataLength;
-  private DataChecksum sum;
-  private long currentOffset = 0;
-  private final byte b[] = new byte[1];
-  private byte csum[] = null;
-  private int checksumSize;
-  private byte[] buffer;
-  private int offset;
-
-  private ReadaheadRequest curReadahead = null;
-  private ReadaheadPool raPool = ReadaheadPool.getInstance();
-  private boolean readahead;
-  private int readaheadLength;
-
-  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
-
-  private boolean disableChecksumValidation = false;
-  
-  /**
-   * Create a checksum input stream that reads
-   * @param in The input stream to be verified for checksum.
-   * @param len The length of the input stream including checksum bytes.
-   */
-  public IFileInputStream(InputStream in, long len, Configuration conf) {
-    this.in = in;
-    this.inFd = getFileDescriptorIfAvail(in);
-    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
-        Integer.MAX_VALUE);
-    checksumSize = sum.getChecksumSize();
-    buffer = new byte[4096];
-    offset = 0;
-    length = len;
-    dataLength = length - checksumSize;
-
-    conf = (conf != null) ? conf : new Configuration();
-    readahead = conf.getBoolean(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD);
-    readaheadLength = conf.getInt(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES);
-
-    doReadahead();
-  }
-
-  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
-    FileDescriptor fd = null;
-    try {
-      if (in instanceof HasFileDescriptor) {
-        fd = ((HasFileDescriptor)in).getFileDescriptor();
-      } else if (in instanceof FileInputStream) {
-        fd = ((FileInputStream)in).getFD();
-      }
-    } catch (IOException e) {
-      LOG.info("Unable to determine FileDescriptor", e);
-    }
-    return fd;
-  }
-
-  /**
-   * Close the input stream. Note that we need to read to the end of the
-   * stream to validate the checksum.
-   */
-  @Override
-  public void close() throws IOException {
-
-    if (curReadahead != null) {
-      curReadahead.cancel();
-    }
-    if (currentOffset < dataLength) {
-      byte[] t = new byte[Math.min((int)
-            (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
-      while (currentOffset < dataLength) {
-        int n = read(t, 0, t.length);
-        if (0 == n) {
-          throw new EOFException("Could not validate checksum");
-        }
-      }
-    }
-    in.close();
-  }
-  
-  @Override
-  public long skip(long n) throws IOException {
-   throw new IOException("Skip not supported for IFileInputStream");
-  }
-  
-  public long getPosition() {
-    return (currentOffset >= dataLength) ? dataLength : currentOffset;
-  }
-  
-  public long getSize() {
-    return checksumSize;
-  }
-
-  private void checksum(byte[] b, int off, int len) {
-    if(len >= buffer.length) {
-      sum.update(buffer, 0, offset);
-      offset = 0;
-      sum.update(b, off, len);
-      return;
-    }
-    final int remaining = buffer.length - offset;
-    if(len > remaining) {
-      sum.update(buffer, 0, offset);
-      offset = 0;
-    }
-    /* now we should have len < buffer.length */
-    System.arraycopy(b, off, buffer, offset, len);
-    offset += len;
-  }
-  
-  /**
-   * Read bytes from the stream.
-   * At EOF, checksum is validated, but the checksum
-   * bytes are not passed back in the buffer. 
-   */
-  public int read(byte[] b, int off, int len) throws IOException {
-
-    if (currentOffset >= dataLength) {
-      return -1;
-    }
-
-    doReadahead();
-
-    return doRead(b,off,len);
-  }
-
-  private void doReadahead() {
-    if (raPool != null && inFd != null && readahead) {
-      curReadahead = raPool.readaheadStream(
-          "ifile", inFd,
-          currentOffset, readaheadLength, dataLength,
-          curReadahead);
-    }
-  }
-
-  /**
-   * Read bytes from the stream.
-   * At EOF, checksum is validated and sent back
-   * as the last four bytes of the buffer. The caller should handle
-   * these bytes appropriately
-   */
-  public int readWithChecksum(byte[] b, int off, int len) throws IOException {
-
-    if (currentOffset == length) {
-      return -1;
-    }
-    else if (currentOffset >= dataLength) {
-      // If the previous read drained off all the data, then just return
-      // the checksum now. Note that checksum validation would have 
-      // happened in the earlier read
-      int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
-      if (len < lenToCopy) {
-        lenToCopy = len;
-      }
-      System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, 
-          lenToCopy);
-      currentOffset += lenToCopy;
-      return lenToCopy;
-    }
-
-    int bytesRead = doRead(b,off,len);
-
-    if (currentOffset == dataLength) {
-      if (len >= bytesRead + checksumSize) {
-        System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
-        bytesRead += checksumSize;
-        currentOffset += checksumSize;
-      }
-    }
-    return bytesRead;
-  }
-
-  private int doRead(byte[]b, int off, int len) throws IOException {
-    
-    // If we are trying to read past the end of data, just read
-    // the left over data
-    if (currentOffset + len > dataLength) {
-      len = (int) dataLength - (int)currentOffset;
-    }
-    
-    int bytesRead = in.read(b, off, len);
-
-    if (bytesRead < 0) {
-      throw new ChecksumException("Checksum Error", 0);
-    }
-
-    checksum(b, off, bytesRead);
-
-    currentOffset += bytesRead;
-
-    if (disableChecksumValidation) {
-      return bytesRead;
-    }
-    
-    if (currentOffset == dataLength) {
-      // The last four bytes are checksum. Strip them and verify
-      sum.update(buffer, 0, offset);
-      csum = new byte[checksumSize];
-      IOUtils.readFully(in, csum, 0, checksumSize);
-      if (!sum.compare(csum, 0)) {
-        throw new ChecksumException("Checksum Error", 0);
-      }
-    }
-    return bytesRead;
-  }
-
-
-  @Override
-  public int read() throws IOException {    
-    b[0] = 0;
-    int l = read(b,0,1);
-    if (l < 0)  return l;
-    
-    // Upgrade the b[0] to an int so as not to misinterpret the
-    // first bit of the byte as a sign bit
-    int result = 0xFF & b[0];
-    return result;
-  }
-
-  public byte[] getChecksum() {
-    return csum;
-  }
-
-  void disableChecksumValidation() {
-    disableChecksumValidation = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
deleted file mode 100644
index 3b39900..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.DataChecksum;
-/**
- * A Checksum output stream.
- * Checksum for the contents of the file is calculated and
- * appended to the end of the file on close of the stream.
- * Used for IFiles
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFileOutputStream extends FilterOutputStream {
-
-  /**
-   * The output stream to be checksummed.
-   */
-  private final DataChecksum sum;
-  private byte[] barray;
-  private byte[] buffer;
-  private int offset;
-  private boolean closed = false;
-  private boolean finished = false;
-
-  /**
-   * Create a checksum output stream that writes
-   * the bytes to the given stream.
-   * @param out
-   */
-  public IFileOutputStream(OutputStream out) {
-    super(out);
-    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
-        Integer.MAX_VALUE);
-    barray = new byte[sum.getChecksumSize()];
-    buffer = new byte[4096];
-    offset = 0;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    finish();
-    out.close();
-  }
-
-  /**
-   * Finishes writing data to the output stream, by writing
-   * the checksum bytes to the end. The underlying stream is not closed.
-   * @throws IOException
-   */
-  public void finish() throws IOException {
-    if (finished) {
-      return;
-    }
-    finished = true;
-    sum.update(buffer, 0, offset);
-    sum.writeValue(barray, 0, false);
-    out.write (barray, 0, sum.getChecksumSize());
-    out.flush();
-  }
-
-  private void checksum(byte[] b, int off, int len) {
-    if(len >= buffer.length) {
-      sum.update(buffer, 0, offset);
-      offset = 0;
-      sum.update(b, off, len);
-      return;
-    }
-    final int remaining = buffer.length - offset;
-    if(len > remaining) {
-      sum.update(buffer, 0, offset);
-      offset = 0;
-    }
-    /*
-    // FIXME if needed re-enable this in debug mode
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("XXX checksum" +
-          " b=" + b + " off=" + off +
-          " buffer=" + " offset=" + offset +
-          " len=" + len);
-    }
-    */
-    /* now we should have len < buffer.length */
-    System.arraycopy(b, off, buffer, offset, len);
-    offset += len;
-  }
-
-  /**
-   * Write bytes to the stream.
-   */
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    checksum(b, off, len);
-    out.write(b,off,len);
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    barray[0] = (byte) (b & 0xFF);
-    write(barray,0,1);
-  }
-
-}