You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/08 09:29:13 UTC

[1/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.

Repository: tajo
Updated Branches:
  refs/heads/master facd1ddcc -> b5aa78046


http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
deleted file mode 100644
index 1c63c8a..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ /dev/null
@@ -1,654 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class PullServerAuxService extends AuxiliaryService {
-
-  private static final Log LOG = LogFactory.getLog(PullServerAuxService.class);
-  
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
-  private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
-  private int sslFileBufferSize;
-
-  private ApplicationId appId;
-  private QueryId queryId;
-  private FileSystem localFS;
-
-  /**
-   * Should the shuffle use posix_fadvise calls to manage the OS cache during
-   * sendfile
-   */
-  private boolean manageOsCache;
-  private int readaheadLength;
-  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-   
-
-  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static String userName;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "tajo.pullserver.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
-  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
-  static class ShuffleMetrics implements ChannelFutureListener {
-    @Metric({"OutputBytes","PullServer output in bytes"})
-    MutableCounterLong shuffleOutputBytes;
-    @Metric({"Failed","# of failed shuffle outputs"})
-    MutableCounterInt shuffleOutputsFailed;
-    @Metric({"Succeeded","# of succeeded shuffle outputs"})
-    MutableCounterInt shuffleOutputsOK;
-    @Metric({"Connections","# of current shuffle connections"})
-    MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
-
-  final ShuffleMetrics metrics;
-
-  PullServerAuxService(MetricsSystem ms) {
-    super("httpshuffle");
-    metrics = ms.register(new ShuffleMetrics());
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public PullServerAuxService() {
-    this(DefaultMetricsSystem.instance());
-  }
-
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by PullServerAuxService.
-   * @param meta the metadata returned by the PullServerAuxService
-   * @return the port the PullServer Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    return in.readInt();
-  }
-
-  @Override
-  public void initializeApplication(ApplicationInitializationContext appInitContext) {
-    // TODO these bytes should be versioned
-    // TODO: Once SHuffle is out of NM, this can use MR APIs
-    this.appId = appInitContext.getApplicationId();
-    this.queryId = TajoIdUtils.parseQueryId(appId.toString());
-    this.userName = appInitContext.getUser();
-    userRsrc.put(this.appId.toString(), this.userName);
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext appStopContext) {
-    userRsrc.remove(appStopContext.getApplicationId().toString());
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    try {
-      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-          DEFAULT_SHUFFLE_READAHEAD_BYTES);
-
-      ThreadFactory bossFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Boss #%d")
-          .build();
-      ThreadFactory workerFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Worker #%d")
-          .build();
-
-      selector = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(bossFactory),
-          Executors.newCachedThreadPool(workerFactory));
-
-      localFS = new LocalFileSystem();
-      super.init(new Configuration(conf));
-    } catch (Throwable t) {
-      LOG.error(t);
-    }
-  }
-
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public synchronized void start() {
-    Configuration conf = getConfig();
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
-    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-        ConfVars.PULLSERVER_PORT.defaultIntVal);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
-    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    pipelineFact.PullServer.setPort(port);
-    LOG.info(getName() + " listening on port " + port);
-    super.start();
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public synchronized void stop() {
-    try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
-      pipelineFact.destroy();
-
-      localFS.close();
-    } catch (Throwable t) {
-      LOG.error(t);
-    } finally {
-      super.stop();
-    }
-  }
-
-  @Override
-  public synchronized ByteBuffer getMetaData() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final PullServer PullServer;
-    private SSLFactory sslFactory;
-
-    public HttpPipelineFactory(Configuration conf) throws Exception {
-      PullServer = new PullServer(conf);
-      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
-          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", PullServer);
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-  }
-
-  class PullServer extends SimpleChannelUpstreamHandler {
-    private final Configuration conf;
-    private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-    private int port;
-
-    public PullServer(Configuration conf) {
-      this.conf = conf;
-      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
-    }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != GET) {
-        sendError(ctx, METHOD_NOT_ALLOWED);
-        return;
-      }
-
-      // Parsing the URL into key-values
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(request.getUri()).getParameters();
-      final List<String> types = params.get("type");
-      final List<String> taskIdList = params.get("ta");
-      final List<String> subQueryIds = params.get("sid");
-      final List<String> partitionIds = params.get("p");
-
-      if (types == null || taskIdList == null || subQueryIds == null
-          || partitionIds == null) {
-        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
-            BAD_REQUEST);
-        return;
-      }
-
-      if (types.size() != 1 || subQueryIds.size() != 1) {
-        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
-            BAD_REQUEST);
-        return;
-      }
-
-      final List<FileChunk> chunks = Lists.newArrayList();
-
-      String repartitionType = types.get(0);
-      String sid = subQueryIds.get(0);
-      String partitionId = partitionIds.get(0);
-      List<String> taskIds = splitMaps(taskIdList);
-
-      // the working dir of tajo worker for each query
-      String queryBaseDir = queryId + "/output" + "/";
-
-      LOG.info("PullServer request param: repartitionType=" + repartitionType +
-          ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
-
-      String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
-      if (taskLocalDir == null ||
-          taskLocalDir.equals("")) {
-        LOG.error("Tajo local directory should be specified.");
-      }
-      LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
-
-      // if a subquery requires a range partitioning
-      if (repartitionType.equals("r")) {
-        String ta = taskIds.get(0);
-        Path path = localFS.makeQualified(
-            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
-                + ta + "/output/", conf));
-
-        String startKey = params.get("start").get(0);
-        String endKey = params.get("end").get(0);
-        boolean last = params.get("final") != null;
-
-        FileChunk chunk;
-        try {
-          chunk = getFileCunks(path, startKey, endKey, last);
-        } catch (Throwable t) {
-          LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
-          return;
-        }
-        if (chunk != null) {
-          chunks.add(chunk);
-        }
-
-        // if a subquery requires a hash repartition  or a scattered hash repartition
-      } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
-        for (String ta : taskIds) {
-          Path path = localFS.makeQualified(
-              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
-                  ta + "/output/" + partitionId, conf));
-          File file = new File(path.toUri());
-          FileChunk chunk = new FileChunk(file, 0, file.length());
-          chunks.add(chunk);
-        }
-      } else {
-        LOG.error("Unknown repartition type: " + repartitionType);
-        return;
-      }
-
-      // Write the content.
-      Channel ch = e.getChannel();
-      if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-        ch.write(response);
-        if (!isKeepAlive(request)) {
-          ch.close();
-        }
-      }  else {
-        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-        long totalSize = 0;
-        for (FileChunk chunk : file) {
-          totalSize += chunk.length();
-        }
-        setContentLength(response, totalSize);
-
-        // Write the initial line and the header.
-        ch.write(response);
-
-        ChannelFuture writeFuture = null;
-
-        for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, ch, chunk);
-          if (writeFuture == null) {
-            sendError(ctx, NOT_FOUND);
-            return;
-          }
-        }
-
-        // Decide whether to close the connection or not.
-        if (!isKeepAlive(request)) {
-          // Close the connection when the whole content is written out.
-          writeFuture.addListener(ChannelFutureListener.CLOSE);
-        }
-      }
-    }
-
-    private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   Channel ch,
-                                   FileChunk file) throws IOException {
-      RandomAccessFile spill;
-      try {
-        spill = new RandomAccessFile(file.getFile(), "r");
-      } catch (FileNotFoundException e) {
-        LOG.info(file.getFile() + " not found");
-        return null;
-      }
-      ChannelFuture writeFuture;
-      if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            file.startOffset(), file.length(), manageOsCache, readaheadLength,
-            readaheadPool, file.getFile().getAbsolutePath());
-        writeFuture = ch.write(partition);
-        writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
-      } else {
-        // HTTPS cannot be done with zero copy.
-        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            file.startOffset(), file.length(), sslFileBufferSize,
-            manageOsCache, readaheadLength, readaheadPool,
-            file.getFile().getAbsolutePath());
-        writeFuture = ch.write(chunk);
-      }
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
-      return writeFuture;
-    }
-
-    private void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    private void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-
-      // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
-      if (cause instanceof TooLongFrameException) {
-        sendError(ctx, BAD_REQUEST);
-        return;
-      }
-
-      LOG.error("PullServer error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("PullServer error " + e);
-        sendError(ctx, INTERNAL_SERVER_ERROR);
-      }
-    }
-  }
-
-  public FileChunk getFileCunks(Path outDir,
-                                      String startKey,
-                                      String endKey,
-                                      boolean last) throws IOException {
-    BSTIndex index = new BSTIndex(new TajoConf());
-    BSTIndex.BSTIndexReader idxReader =
-        index.getIndexReader(new Path(outDir, "index"));
-    idxReader.open();
-    Schema keySchema = idxReader.getKeySchema();
-    TupleComparator comparator = idxReader.getComparator();
-
-    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
-        + idxReader.getLastKey());
-
-    File data = new File(URI.create(outDir.toUri() + "/output"));
-    byte [] startBytes = Base64.decodeBase64(startKey);
-    byte [] endBytes = Base64.decodeBase64(endKey);
-
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-    Tuple start;
-    Tuple end;
-    try {
-      start = decoder.toTuple(startBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("StartKey: " + startKey
-          + ", decoded byte size: " + startBytes.length, t);
-    }
-
-    try {
-      end = decoder.toTuple(endBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("EndKey: " + endKey
-          + ", decoded byte size: " + endBytes.length, t);
-    }
-
-
-    if(!comparator.isAscendingFirstKey()) {
-      Tuple tmpKey = start;
-      start = end;
-      end = tmpKey;
-    }
-
-    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
-        (last ? ", last=true" : "") + ")");
-
-    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-      LOG.info("There is no contents");
-      return null;
-    }
-
-    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-        comparator.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-          "], but request start:" + start + ", end: " + end);
-      return null;
-    }
-
-    long startOffset;
-    long endOffset;
-    try {
-      startOffset = idxReader.find(start);
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-    try {
-      endOffset = idxReader.find(end);
-      if (endOffset == -1) {
-        endOffset = idxReader.find(end, true);
-      }
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-
-    // if startOffset == -1 then case 2-1 or case 3
-    if (startOffset == -1) { // this is a hack
-      // if case 2-1 or case 3
-      try {
-        startOffset = idxReader.find(start, true);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end +")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-    }
-
-    if (startOffset == -1) {
-      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-          "State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-    }
-
-    // if greater than indexed values
-    if (last || (endOffset == -1
-        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-      endOffset = data.length();
-    }
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-    LOG.info("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
deleted file mode 100644
index 564950f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.lang.reflect.MethodUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.nativeio.NativeIO;
-
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
-
-public class PullServerUtil {
-  private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
-
-  private static boolean nativeIOPossible = false;
-  private static Method posixFadviseIfPossible;
-
-  static {
-    if (NativeIO.isAvailable() && loadNativeIO()) {
-      nativeIOPossible = true;
-    } else {
-      LOG.warn("Unable to load hadoop nativeIO");
-    }
-  }
-
-  public static boolean isNativeIOPossible() {
-    return nativeIOPossible;
-  }
-
-  /**
-   * Call posix_fadvise on the given file descriptor. See the manpage
-   * for this syscall for more information. On systems where this
-   * call is not available, does nothing.
-   */
-  public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
-                                            long offset, long len, int flags) {
-    if (nativeIOPossible) {
-      try {
-        posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
-      } catch (Throwable t) {
-        nativeIOPossible = false;
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
-      }
-    }
-  }
-
-  /* load hadoop native method if possible */
-  private static boolean loadNativeIO() {
-    boolean loaded = true;
-    if (nativeIOPossible) return loaded;
-
-    Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
-    try {
-      Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
-      Class posixClass;
-      if (getCacheManipulator != null) {
-        Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
-        posixClass = posix.getClass();
-      } else {
-        posixClass = NativeIO.POSIX.class;
-      }
-      posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
-    } catch (Throwable e) {
-      loaded = false;
-      LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
-    }
-
-    if (posixFadviseIfPossible == null) {
-      loaded = false;
-    }
-    return loaded;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
deleted file mode 100644
index d030eed..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
-import org.apache.tajo.util.StringUtils;
-
-public class TajoPullServer extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(TajoPullServer.class);
-
-  private TajoPullServerService pullService;
-  private TajoConf systemConf;
-
-  public TajoPullServer() {
-    super(TajoPullServer.class.getName());
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    this.systemConf = (TajoConf)conf;
-    pullService = new TajoPullServerService();
-    addService(pullService);
-
-    super.init(conf);
-  }
-
-  public void startPullServer(TajoConf systemConf) {
-    init(systemConf);
-    start();
-  }
-
-  public void start() {
-    super.start();
-
-  }
-
-  public static void main(String[] args) throws Exception {
-    StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
-
-    if (!TajoPullServerService.isStandalone()) {
-      LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
-      return;
-    }
-
-    TajoConf tajoConf = new TajoConf();
-    tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
-
-    (new TajoPullServer()).startPullServer(tajoConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
deleted file mode 100644
index 5a4e69f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ /dev/null
@@ -1,808 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.*;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class TajoPullServerService extends AbstractService {
-
-  private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
-
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
-  private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
-  private int sslFileBufferSize;
-
-  private ApplicationId appId;
-  private FileSystem localFS;
-
-  /**
-   * Should the shuffle use posix_fadvise calls to manage the OS cache during
-   * sendfile
-   */
-  private boolean manageOsCache;
-  private int readaheadLength;
-  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-
-
-  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private String userName;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "tajo.pullserver.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
-  private static boolean STANDALONE = false;
-
-  static {
-    String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    if (!StringUtils.isEmpty(standalone)) {
-      STANDALONE = standalone.equalsIgnoreCase("true");
-    }
-  }
-
-  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
-  static class ShuffleMetrics implements ChannelFutureListener {
-    @Metric({"OutputBytes","PullServer output in bytes"})
-    MutableCounterLong shuffleOutputBytes;
-    @Metric({"Failed","# of failed shuffle outputs"})
-    MutableCounterInt shuffleOutputsFailed;
-    @Metric({"Succeeded","# of succeeded shuffle outputs"})
-    MutableCounterInt shuffleOutputsOK;
-    @Metric({"Connections","# of current shuffle connections"})
-    MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
-
-  final ShuffleMetrics metrics;
-
-  TajoPullServerService(MetricsSystem ms) {
-    super("httpshuffle");
-    metrics = ms.register(new ShuffleMetrics());
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public TajoPullServerService() {
-    this(DefaultMetricsSystem.instance());
-  }
-
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by PullServerAuxService.
-   * @param meta the metadata returned by the PullServerAuxService
-   * @return the port the PullServer Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    return in.readInt();
-  }
-
-  public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
-    // TODO these bytes should be versioned
-    // TODO: Once SHuffle is out of NM, this can use MR APIs
-    this.appId = appId;
-    this.userName = user;
-    userRsrc.put(appId.toString(), user);
-  }
-
-  public void stopApp(ApplicationId appId) {
-    userRsrc.remove(appId.toString());
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    try {
-      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-          DEFAULT_SHUFFLE_READAHEAD_BYTES);
-
-      int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
-          Runtime.getRuntime().availableProcessors() * 2);
-
-      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
-
-      localFS = new LocalFileSystem();
-
-      conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
-          , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
-      super.init(conf);
-      LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
-    } catch (Throwable t) {
-      LOG.error(t);
-    }
-  }
-
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
-
-    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-        ConfVars.PULLSERVER_PORT.defaultIntVal);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
-
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
-    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    pipelineFact.PullServer.setPort(port);
-    LOG.info(getName() + " listening on port " + port);
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-
-
-    if (STANDALONE) {
-      File pullServerPortFile = getPullServerPortFile();
-      if (pullServerPortFile.exists()) {
-        pullServerPortFile.delete();
-      }
-      pullServerPortFile.getParentFile().mkdirs();
-      LOG.info("Write PullServerPort to " + pullServerPortFile);
-      FileOutputStream out = null;
-      try {
-        out = new FileOutputStream(pullServerPortFile);
-        out.write(("" + port).getBytes());
-      } catch (Exception e) {
-        LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile +
-            ", " + e.getMessage(), e);
-        System.exit(-1);
-      } finally {
-        IOUtils.closeStream(out);
-      }
-    }
-    super.serviceInit(conf);
-    LOG.info("TajoPullServerService started: port=" + port);
-  }
-
-  public static boolean isStandalone() {
-    return STANDALONE;
-  }
-
-  private static File getPullServerPortFile() {
-    String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
-    if (StringUtils.isEmpty(pullServerPortInfoFile)) {
-      pullServerPortInfoFile = "/tmp";
-    }
-    return new File(pullServerPortInfoFile + "/pullserver.port");
-  }
-
-  // TODO change to get port from master or tajoConf
-  public static int readPullServerPort() {
-    FileInputStream in = null;
-    try {
-      File pullServerPortFile = getPullServerPortFile();
-
-      if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
-        return -1;
-      }
-      in = new FileInputStream(pullServerPortFile);
-      byte[] buf = new byte[1024];
-      int readBytes = in.read(buf);
-      return Integer.parseInt(new String(buf, 0, readBytes));
-    } catch (IOException e) {
-      LOG.fatal(e.getMessage(), e);
-      return -1;
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public synchronized void stop() {
-    try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
-      pipelineFact.destroy();
-
-      localFS.close();
-    } catch (Throwable t) {
-      LOG.error(t);
-    } finally {
-      super.stop();
-    }
-  }
-
-  public synchronized ByteBuffer getMeta() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final PullServer PullServer;
-    private SSLFactory sslFactory;
-
-    public HttpPipelineFactory(Configuration conf) throws Exception {
-      PullServer = new PullServer(conf);
-      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
-          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-
-      int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
-          ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
-      pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", PullServer);
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-  }
-
-
-  Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
-
-  public void completeFileChunk(FileRegion filePart,
-                                   String requestUri,
-                                   long startTime) {
-    ProcessingStatus status = processingStatusMap.get(requestUri);
-    if (status != null) {
-      status.decrementRemainFiles(filePart, startTime);
-    }
-  }
-
-  class ProcessingStatus {
-    String requestUri;
-    int numFiles;
-    AtomicInteger remainFiles;
-    long startTime;
-    long makeFileListTime;
-    long minTime = Long.MAX_VALUE;
-    long maxTime;
-    int numSlowFile;
-
-    public ProcessingStatus(String requestUri) {
-      this.requestUri = requestUri;
-      this.startTime = System.currentTimeMillis();
-    }
-
-    public void setNumFiles(int numFiles) {
-      this.numFiles = numFiles;
-      this.remainFiles = new AtomicInteger(numFiles);
-    }
-    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
-      synchronized(remainFiles) {
-        long fileSendTime = System.currentTimeMillis() - fileStartTime;
-        if (fileSendTime > 20 * 1000) {
-          LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
-          numSlowFile++;
-        }
-        if (fileSendTime > maxTime) {
-          maxTime = fileSendTime;
-        }
-        if (fileSendTime < minTime) {
-          minTime = fileSendTime;
-        }
-        int remain = remainFiles.decrementAndGet();
-        if (remain <= 0) {
-          processingStatusMap.remove(requestUri);
-          LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
-              "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
-              "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
-        }
-      }
-    }
-  }
-
-  class PullServer extends SimpleChannelUpstreamHandler {
-
-    private final Configuration conf;
-//    private final IndexCache indexCache;
-    private final LocalDirAllocator lDirAlloc =
-      new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-    private int port;
-
-    public PullServer(Configuration conf) throws IOException {
-      this.conf = conf;
-//      indexCache = new IndexCache(new JobConf(conf));
-      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-          ConfVars.PULLSERVER_PORT.defaultIntVal);
-
-      // init local temporal dir
-      lDirAlloc.getAllLocalPathsToRead(".", conf);
-    }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
-        throws Exception {
-
-      accepted.add(evt.getChannel());
-      LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
-      super.channelOpen(ctx, evt);
-
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != GET) {
-        sendError(ctx, METHOD_NOT_ALLOWED);
-        return;
-      }
-
-      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
-      processingStatusMap.put(request.getUri().toString(), processingStatus);
-      // Parsing the URL into key-values
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(request.getUri()).getParameters();
-      final List<String> types = params.get("type");
-      final List<String> qids = params.get("qid");
-      final List<String> taskIdList = params.get("ta");
-      final List<String> subQueryIds = params.get("sid");
-      final List<String> partIds = params.get("p");
-      final List<String> offsetList = params.get("offset");
-      final List<String> lengthList = params.get("length");
-
-      if (types == null || subQueryIds == null || qids == null || partIds == null) {
-        sendError(ctx, "Required queryId, type, subquery Id, and part id",
-            BAD_REQUEST);
-        return;
-      }
-
-      if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
-        sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id",
-            BAD_REQUEST);
-        return;
-      }
-
-      String partId = partIds.get(0);
-      String queryId = qids.get(0);
-      String shuffleType = types.get(0);
-      String sid = subQueryIds.get(0);
-
-      long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-      long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-      if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
-        sendError(ctx, "Required taskIds", BAD_REQUEST);
-      }
-
-      List<String> taskIds = splitMaps(taskIdList);
-
-      String queryBaseDir = queryId.toString() + "/output";
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("PullServer request param: shuffleType=" + shuffleType +
-            ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
-
-        // the working dir of tajo worker for each query
-        LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
-      }
-
-      final List<FileChunk> chunks = Lists.newArrayList();
-
-      // if a subquery requires a range shuffle
-      if (shuffleType.equals("r")) {
-        String ta = taskIds.get(0);
-        if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
-          LOG.warn(e);
-          sendError(ctx, NO_CONTENT);
-          return;
-        }
-        Path path = localFS.makeQualified(
-            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
-        String startKey = params.get("start").get(0);
-        String endKey = params.get("end").get(0);
-        boolean last = params.get("final") != null;
-
-        FileChunk chunk;
-        try {
-          chunk = getFileCunks(path, startKey, endKey, last);
-        } catch (Throwable t) {
-          LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
-          return;
-        }
-        if (chunk != null) {
-          chunks.add(chunk);
-        }
-
-        // if a subquery requires a hash shuffle or a scattered hash shuffle
-      } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-        int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
-        String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-        if (!lDirAlloc.ifExists(partPath, conf)) {
-          LOG.warn("Partition shuffle file not exists: " + partPath);
-          sendError(ctx, NO_CONTENT);
-          return;
-        }
-
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
-
-        File file = new File(path.toUri());
-        long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-        long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
-        if (startPos >= file.length()) {
-          String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
-          LOG.error(errorMessage);
-          sendError(ctx, errorMessage, BAD_REQUEST);
-          return;
-        }
-        LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
-        FileChunk chunk = new FileChunk(file, startPos, readLen);
-        chunks.add(chunk);
-      } else {
-        LOG.error("Unknown shuffle type: " + shuffleType);
-        sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
-        return;
-      }
-
-      processingStatus.setNumFiles(chunks.size());
-      processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
-      // Write the content.
-      Channel ch = e.getChannel();
-      if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-        ch.write(response);
-        if (!isKeepAlive(request)) {
-          ch.close();
-        }
-      }  else {
-        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-        long totalSize = 0;
-        for (FileChunk chunk : file) {
-          totalSize += chunk.length();
-        }
-        setContentLength(response, totalSize);
-
-        // Write the initial line and the header.
-        ch.write(response);
-
-        ChannelFuture writeFuture = null;
-
-        for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
-          if (writeFuture == null) {
-            sendError(ctx, NOT_FOUND);
-            return;
-          }
-        }
-
-        // Decide whether to close the connection or not.
-        if (!isKeepAlive(request)) {
-          // Close the connection when the whole content is written out.
-          writeFuture.addListener(ChannelFutureListener.CLOSE);
-        }
-      }
-    }
-
-    private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   Channel ch,
-                                   FileChunk file,
-                                   String requestUri) throws IOException {
-      long startTime = System.currentTimeMillis();
-      RandomAccessFile spill = null;
-      ChannelFuture writeFuture;
-      try {
-        spill = new RandomAccessFile(file.getFile(), "r");
-        if (ch.getPipeline().get(SslHandler.class) == null) {
-          final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
-              file.startOffset(), file.length(), manageOsCache, readaheadLength,
-              readaheadPool, file.getFile().getAbsolutePath());
-          writeFuture = ch.write(filePart);
-          writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
-        } else {
-          // HTTPS cannot be done with zero copy.
-          final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-              file.startOffset(), file.length(), sslFileBufferSize,
-              manageOsCache, readaheadLength, readaheadPool,
-              file.getFile().getAbsolutePath());
-          writeFuture = ch.write(chunk);
-        }
-      } catch (FileNotFoundException e) {
-        LOG.info(file.getFile() + " not found");
-        return null;
-      } catch (Throwable e) {
-        if (spill != null) {
-          //should close a opening file
-          spill.close();
-        }
-        return null;
-      }
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
-      return writeFuture;
-    }
-
-    private void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    private void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-
-      // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      LOG.error(e.getCause().getMessage(), e.getCause());
-      //if channel.close() is not called, never closed files in this request
-      if (ctx.getChannel().isConnected()){
-        ctx.getChannel().close();
-      }
-    }
-  }
-
-  public static FileChunk getFileCunks(Path outDir,
-                                      String startKey,
-                                      String endKey,
-                                      boolean last) throws IOException {
-    BSTIndex index = new BSTIndex(new TajoConf());
-    BSTIndex.BSTIndexReader idxReader =
-        index.getIndexReader(new Path(outDir, "index"));
-    idxReader.open();
-    Schema keySchema = idxReader.getKeySchema();
-    TupleComparator comparator = idxReader.getComparator();
-
-    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
-        + idxReader.getLastKey());
-
-    File data = new File(URI.create(outDir.toUri() + "/output"));
-    byte [] startBytes = Base64.decodeBase64(startKey);
-    byte [] endBytes = Base64.decodeBase64(endKey);
-
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-    Tuple start;
-    Tuple end;
-    try {
-      start = decoder.toTuple(startBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("StartKey: " + startKey
-          + ", decoded byte size: " + startBytes.length, t);
-    }
-
-    try {
-      end = decoder.toTuple(endBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("EndKey: " + endKey
-          + ", decoded byte size: " + endBytes.length, t);
-    }
-
-    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
-        (last ? ", last=true" : "") + ")");
-
-    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-      LOG.info("There is no contents");
-      return null;
-    }
-
-    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-        comparator.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-          "], but request start:" + start + ", end: " + end);
-      return null;
-    }
-
-    long startOffset;
-    long endOffset;
-    try {
-      startOffset = idxReader.find(start);
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-    try {
-      endOffset = idxReader.find(end);
-      if (endOffset == -1) {
-        endOffset = idxReader.find(end, true);
-      }
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-
-    // if startOffset == -1 then case 2-1 or case 3
-    if (startOffset == -1) { // this is a hack
-      // if case 2-1 or case 3
-      try {
-        startOffset = idxReader.find(start, true);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end +")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-    }
-
-    if (startOffset == -1) {
-      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-          "State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-    }
-
-    // if greater than indexed values
-    if (last || (endOffset == -1
-        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-      endOffset = data.length();
-    }
-
-    idxReader.close();
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-    LOG.info("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index 67e7423..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.pullserver.FileAccessForbiddenException;
-import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
-  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
-  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
-  public AdvancedDataRetriever() {
-  }
-  
-  public void register(String taskAttemptId, RetrieverHandler handler) {
-    synchronized (handlerMap) {
-      if (!handlerMap.containsKey(taskAttemptId)) {
-        handlerMap.put(taskAttemptId, handler);
-      }
-    } 
-  }
-  
-  public void unregister(String taskAttemptId) {
-    synchronized (handlerMap) {
-      if (handlerMap.containsKey(taskAttemptId)) {
-        handlerMap.remove(taskAttemptId);
-      }
-    }
-  }
-
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-
-    final Map<String, List<String>> params =
-      new QueryStringDecoder(request.getUri()).getParameters();
-
-    if (!params.containsKey("qid")) {
-      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
-    }
-
-    if (params.containsKey("sid")) {
-      List<FileChunk> chunks = Lists.newArrayList();
-      List<String> queryUnidIds = splitMaps(params.get("qid"));
-      for (String eachQueryUnitId : queryUnidIds) {
-        String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
-        ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
-        QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
-
-        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
-
-        RetrieverHandler handler = handlerMap.get(attemptId.toString());
-        FileChunk chunk = handler.get(params);
-        chunks.add(chunk);
-      }
-      return chunks.toArray(new FileChunk[chunks.size()]);
-    } else {
-      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
-      FileChunk chunk = handler.get(params);
-      if (chunk == null) {
-        if (params.containsKey("qid")) { // if there is no content corresponding to the query
-          return null;
-        } else { // if there is no
-          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
-        }
-      }
-
-      File file = chunk.getFile();
-      if (file.isHidden() || !file.exists()) {
-        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
-      }
-      if (!file.isFile()) {
-        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
-      }
-
-      return new FileChunk[] {chunk};
-    }
-  }
-
-  private List<String> splitMaps(List<String> qids) {
-    if (null == qids) {
-      LOG.error("QueryUnitId is EMPTY");
-      return null;
-    }
-
-    final List<String> ret = new ArrayList<String>();
-    for (String qid : qids) {
-      Collections.addAll(ret, qid.split(","));
-    }
-    return ret;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
deleted file mode 100644
index 8f55f7b..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
-  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index dc63929..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.pullserver.FileAccessForbiddenException;
-import org.apache.tajo.pullserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
-  public String baseDir;
-  
-  public DirectoryRetriever(String baseDir) {
-    this.baseDir = baseDir;
-  }
-
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
-    if (path == null) {
-      throw new IllegalArgumentException("Wrong path: " +path);
-    }
-
-    File file = new File(baseDir, path);
-    if (file.isHidden() || !file.exists()) {
-      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
-    }
-    if (!file.isFile()) {
-      throw new FileAccessForbiddenException("No such file: "
-          + baseDir + "/" + path); 
-    }
-    
-    return new FileChunk[] {new FileChunk(file, 0, file.length())};
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
deleted file mode 100644
index 67cff21..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
-  private final File file;
-  private final long startOffset;
-  private long length;
-
-  /**
-   * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
-   */
-  private boolean fromRemote;
-
-  /**
-   * ExecutionBlockId
-   */
-  private String ebId;
-
-  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
-    this.file = file;
-    this.startOffset = startOffset;
-    this.length = length;
-  }
-
-  public File getFile() {
-    return this.file;
-  }
-
-  public long startOffset() {
-    return this.startOffset;
-  }
-
-  public long length() {
-    return this.length;
-  }
-
-  public void setLength(long newLength) {
-    this.length = newLength;
-  }
-
-  public boolean fromRemote() {
-    return this.fromRemote;
-  }
-
-  public void setFromRemote(boolean newVal) {
-    this.fromRemote = newVal;
-  }
-
-  public String getEbId() {
-    return this.ebId;
-  }
-
-  public void setEbId(String newVal) {
-    this.ebId = newVal;
-  }
-
-  public String toString() {
-    return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
-	+ file.getAbsolutePath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
deleted file mode 100644
index 5567c0d..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
-  /**
-   *
-   * @param kvs url-decoded key/value pairs
-   * @return a desired part of a file
-   * @throws java.io.IOException
-   */
-  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}


[3/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.

Posted by hy...@apache.org.
TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.

Closes #284


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b5aa7804
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b5aa7804
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b5aa7804

Branch: refs/heads/master
Commit: b5aa780460fcfbf657541ee6c94d41b34b1b24b9
Parents: facd1dd
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Dec 8 17:27:16 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 8 17:27:16 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 pom.xml                                         |   2 +-
 tajo-pullserver/pom.xml                         | 146 ++++
 .../tajo/pullserver/FadvisedChunkedFile.java    |  81 ++
 .../tajo/pullserver/FadvisedFileRegion.java     | 170 ++++
 .../FileAccessForbiddenException.java           |  40 +
 .../tajo/pullserver/FileCloseListener.java      |  53 ++
 .../tajo/pullserver/HttpDataServerHandler.java  | 245 ++++++
 .../HttpDataServerPipelineFactory.java          |  56 ++
 .../org/apache/tajo/pullserver/HttpUtil.java    |  69 ++
 .../tajo/pullserver/PullServerAuxService.java   | 654 +++++++++++++++
 .../apache/tajo/pullserver/PullServerUtil.java  |  90 +++
 .../apache/tajo/pullserver/TajoPullServer.java  |  73 ++
 .../tajo/pullserver/TajoPullServerService.java  | 808 +++++++++++++++++++
 .../retriever/AdvancedDataRetriever.java        | 126 +++
 .../pullserver/retriever/DataRetriever.java     |  29 +
 .../retriever/DirectoryRetriever.java           |  56 ++
 .../tajo/pullserver/retriever/FileChunk.java    |  81 ++
 .../pullserver/retriever/RetrieverHandler.java  |  33 +
 tajo-yarn-pullserver/pom.xml                    | 146 ----
 .../tajo/pullserver/FadvisedChunkedFile.java    |  81 --
 .../tajo/pullserver/FadvisedFileRegion.java     | 170 ----
 .../FileAccessForbiddenException.java           |  40 -
 .../tajo/pullserver/FileCloseListener.java      |  53 --
 .../tajo/pullserver/HttpDataServerHandler.java  | 245 ------
 .../HttpDataServerPipelineFactory.java          |  56 --
 .../org/apache/tajo/pullserver/HttpUtil.java    |  69 --
 .../tajo/pullserver/PullServerAuxService.java   | 654 ---------------
 .../apache/tajo/pullserver/PullServerUtil.java  |  90 ---
 .../apache/tajo/pullserver/TajoPullServer.java  |  73 --
 .../tajo/pullserver/TajoPullServerService.java  | 808 -------------------
 .../retriever/AdvancedDataRetriever.java        | 126 ---
 .../pullserver/retriever/DataRetriever.java     |  29 -
 .../retriever/DirectoryRetriever.java           |  56 --
 .../tajo/pullserver/retriever/FileChunk.java    |  81 --
 .../pullserver/retriever/RetrieverHandler.java  |  33 -
 36 files changed, 2814 insertions(+), 2811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index acc72b9..c84992b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -154,6 +154,9 @@ Release 0.9.1 - unreleased
 
   TASKS
 
+    TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.
+    (hyunsik)
+
     TAJO-1157: Required Java version in tutorial doc needs to be updated.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3dca9c0..62e03f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
     <module>tajo-client</module>
     <module>tajo-jdbc</module>
     <module>tajo-storage</module>
-    <module>tajo-yarn-pullserver</module>
+    <module>tajo-pullserver</module>
     <module>tajo-dist</module>
     <module>tajo-thirdparty/asm</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
new file mode 100644
index 0000000..a7644a1
--- /dev/null
+++ b/tajo-pullserver/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <name>Tajo Core PullServer</name>
+  <artifactId>tajo-yarn-pullserver</artifactId>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-rpc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+        </configuration>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
new file mode 100644
index 0000000..b0b8d18
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+                             int chunkSize, boolean manageOsCache, int readaheadLength,
+                             ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
new file mode 100644
index 0000000..18cf4b6
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier) throws IOException {
+    this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+        identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+  }
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier, int shuffleBufferSize,
+                            boolean shuffleTransferToAllowed) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    }
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from
+   * a disk to a local buffer in memory, and then it transfers data from the
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows
+   * due to a small IO request generated. customShuffleTransfer can control
+   * the size of the IO requests by changing the size of the intermediate
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+              " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans;
+        trans = 0;
+      }
+
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+
+      byteBuffer.clear();
+    }
+
+    return actualCount - trans;
+  }
+
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    super.releaseExternalResources();
+  }
+
+  /**
+   * Call when the transfer completes successfully so we can advise the OS that
+   * we don't need the region to be cached anymore.
+   */
+  public void transferSuccessful() {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
new file mode 100644
index 0000000..c703f6f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import java.io.IOException;
+
+public class FileAccessForbiddenException extends IOException {
+  private static final long serialVersionUID = -3383272565826389213L;
+
+  public FileAccessForbiddenException() {
+  }
+
+  public FileAccessForbiddenException(String message) {
+    super(message);
+  }
+
+  public FileAccessForbiddenException(Throwable cause) {
+    super(cause);
+  }
+
+  public FileAccessForbiddenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
new file mode 100644
index 0000000..236db89
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+  private FadvisedFileRegion filePart;
+  private String requestUri;
+  private TajoPullServerService pullServerService;
+  private long startTime;
+
+  public FileCloseListener(FadvisedFileRegion filePart,
+                           String requestUri,
+                           long startTime,
+                           TajoPullServerService pullServerService) {
+    this.filePart = filePart;
+    this.requestUri = requestUri;
+    this.pullServerService = pullServerService;
+    this.startTime = startTime;
+  }
+
+  // TODO error handling; distinguish IO/connection failures,
+  //      attribute to appropriate spill output
+  @Override
+  public void operationComplete(ChannelFuture future) {
+    if(future.isSuccess()){
+      filePart.transferSuccessful();
+    }
+    filePart.releaseExternalResources();
+    if (pullServerService != null) {
+      pullServerService.completeFileChunk(filePart, requestUri, startTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
new file mode 100644
index 0000000..31db15c
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.pullserver.retriever.DataRetriever;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.*;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+  private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
+
+  Map<ExecutionBlockId, DataRetriever> retrievers =
+      new ConcurrentHashMap<ExecutionBlockId, DataRetriever>();
+  private String userName;
+  private String appId;
+
+  public HttpDataServerHandler(String userName, String appId) {
+    this.userName= userName;
+    this.appId = appId;
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    HttpRequest request = (HttpRequest) e.getMessage();
+    if (request.getMethod() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+
+    String base =
+        ContainerLocalizer.USERCACHE + "/" + userName + "/"
+            + ContainerLocalizer.APPCACHE + "/"
+            + appId + "/output" + "/";
+
+    final Map<String, List<String>> params =
+        new QueryStringDecoder(request.getUri()).getParameters();
+
+    List<FileChunk> chunks = Lists.newArrayList();
+    List<String> taskIds = splitMaps(params.get("ta"));
+    int sid = Integer.valueOf(params.get("sid").get(0));
+    int partitionId = Integer.valueOf(params.get("p").get(0));
+    for (String ta : taskIds) {
+
+      File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId);
+      FileChunk chunk = new FileChunk(file, 0, file.length());
+      chunks.add(chunk);
+    }
+
+    FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+//    try {
+//      file = retriever.handle(ctx, request);
+//    } catch (FileNotFoundException fnf) {
+//      LOG.error(fnf);
+//      sendError(ctx, NOT_FOUND);
+//      return;
+//    } catch (IllegalArgumentException iae) {
+//      LOG.error(iae);
+//      sendError(ctx, BAD_REQUEST);
+//      return;
+//    } catch (FileAccessForbiddenException fafe) {
+//      LOG.error(fafe);
+//      sendError(ctx, FORBIDDEN);
+//      return;
+//    } catch (IOException ioe) {
+//      LOG.error(ioe);
+//      sendError(ctx, INTERNAL_SERVER_ERROR);
+//      return;
+//    }
+
+    // Write the content.
+    Channel ch = e.getChannel();
+    if (file == null) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+      ch.write(response);
+      if (!isKeepAlive(request)) {
+        ch.close();
+      }
+    }  else {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      long totalSize = 0;
+      for (FileChunk chunk : file) {
+        totalSize += chunk.length();
+      }
+      setContentLength(response, totalSize);
+
+      // Write the initial line and the header.
+      ch.write(response);
+
+      ChannelFuture writeFuture = null;
+
+      for (FileChunk chunk : file) {
+        writeFuture = sendFile(ctx, ch, chunk);
+        if (writeFuture == null) {
+          sendError(ctx, NOT_FOUND);
+          return;
+        }
+      }
+
+      // Decide whether to close the connection or not.
+      if (!isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
+  }
+
+  private ChannelFuture sendFile(ChannelHandlerContext ctx,
+                                 Channel ch,
+                                 FileChunk file) throws IOException {
+    RandomAccessFile raf;
+    try {
+      raf = new RandomAccessFile(file.getFile(), "r");
+    } catch (FileNotFoundException fnfe) {
+      return null;
+    }
+
+    ChannelFuture writeFuture;
+    if (ch.getPipeline().get(SslHandler.class) != null) {
+      // Cannot use zero-copy with HTTPS.
+      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
+          file.length(), 8192));
+    } else {
+      // No encryption - use zero-copy.
+      final FileRegion region = new DefaultFileRegion(raf.getChannel(),
+          file.startOffset(), file.length());
+      writeFuture = ch.write(region);
+      writeFuture.addListener(new ChannelFutureListener() {
+        public void operationComplete(ChannelFuture future) {
+          region.releaseExternalResources();
+        }
+      });
+    }
+
+    return writeFuture;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+      throws Exception {
+    Channel ch = e.getChannel();
+    Throwable cause = e.getCause();
+    if (cause instanceof TooLongFrameException) {
+      sendError(ctx, BAD_REQUEST);
+      return;
+    }
+
+    cause.printStackTrace();
+    if (ch.isConnected()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  public static String sanitizeUri(String uri) {
+    // Decode the path.
+    try {
+      uri = URLDecoder.decode(uri, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      try {
+        uri = URLDecoder.decode(uri, "ISO-8859-1");
+      } catch (UnsupportedEncodingException e1) {
+        throw new Error();
+      }
+    }
+
+    // Convert file separators.
+    uri = uri.replace('/', File.separatorChar);
+
+    // Simplistic dumb security check.
+    // You will have to do something serious in the production environment.
+    if (uri.contains(File.separator + ".")
+        || uri.contains("." + File.separator) || uri.startsWith(".")
+        || uri.endsWith(".")) {
+      return null;
+    }
+
+    // Convert to absolute path.
+    return uri;
+  }
+
+  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.setContent(ChannelBuffers.copiedBuffer(
+        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+
+    // Close the connection as soon as the error message is sent.
+    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+  }
+
+  private List<String> splitMaps(List<String> qids) {
+    if (null == qids) {
+      LOG.error("QueryUnitId is EMPTY");
+      return null;
+    }
+
+    final List<String> ret = new ArrayList<String>();
+    for (String qid : qids) {
+      Collections.addAll(ret, qid.split(","));
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
new file mode 100644
index 0000000..4c8bd8b
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
+  private String userName;
+  private String appId;
+  public HttpDataServerPipelineFactory(String userName, String appId) {
+    this.userName = userName;
+    this.appId = appId;
+  }
+
+  public ChannelPipeline getPipeline() throws Exception {
+    // Create a default pipeline implementation.
+    ChannelPipeline pipeline = pipeline();
+
+    // Uncomment the following line if you want HTTPS
+    // SSLEngine engine =
+    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    // engine.setUseClientMode(false);
+    // pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+    pipeline.addLast("deflater", new HttpContentCompressor());
+    pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
new file mode 100644
index 0000000..2cbb101
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.Map;
+
+public class HttpUtil {
+  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
+    return getParamsFromQuery(uri.getQuery());
+  }
+
+  /**
+   * It parses a query string into key/value pairs
+   *
+   * @param queryString decoded query string
+   * @return key/value pairs parsed from a given query string
+   * @throws java.io.UnsupportedEncodingException
+   */
+  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
+    String [] queries = queryString.split("&");
+
+    Map<String,String> params = Maps.newHashMap();
+    String [] param;
+    for (String q : queries) {
+      param = q.split("=");
+      params.put(param[0], param[1]);
+    }
+
+    return params;
+  }
+
+  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
+    StringBuilder sb = new StringBuilder();
+
+    boolean first = true;
+    for (Map.Entry<String,String> param : params.entrySet()) {
+      if (!first) {
+        sb.append("&");
+      }
+      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
+          append("=").
+          append(URLEncoder.encode(param.getValue(), "UTF-8"));
+      first = false;
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
new file mode 100644
index 0000000..1c63c8a
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class PullServerAuxService extends AuxiliaryService {
+
+  private static final Log LOG = LogFactory.getLog(PullServerAuxService.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+  private int port;
+  private ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private HttpPipelineFactory pipelineFact;
+  private int sslFileBufferSize;
+
+  private ApplicationId appId;
+  private QueryId queryId;
+  private FileSystem localFS;
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+   
+
+  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+  private static final Map<String,String> userRsrc =
+    new ConcurrentHashMap<String,String>();
+  private static String userName;
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "tajo.pullserver.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+  static class ShuffleMetrics implements ChannelFutureListener {
+    @Metric({"OutputBytes","PullServer output in bytes"})
+    MutableCounterLong shuffleOutputBytes;
+    @Metric({"Failed","# of failed shuffle outputs"})
+    MutableCounterInt shuffleOutputsFailed;
+    @Metric({"Succeeded","# of succeeded shuffle outputs"})
+    MutableCounterInt shuffleOutputsOK;
+    @Metric({"Connections","# of current shuffle connections"})
+    MutableGaugeInt shuffleConnections;
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isSuccess()) {
+        shuffleOutputsOK.incr();
+      } else {
+        shuffleOutputsFailed.incr();
+      }
+      shuffleConnections.decr();
+    }
+  }
+
+  final ShuffleMetrics metrics;
+
+  PullServerAuxService(MetricsSystem ms) {
+    super("httpshuffle");
+    metrics = ms.register(new ShuffleMetrics());
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public PullServerAuxService() {
+    this(DefaultMetricsSystem.instance());
+  }
+
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  public static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(port);
+    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+  }
+
+  /**
+   * A helper function to deserialize the metadata returned by PullServerAuxService.
+   * @param meta the metadata returned by the PullServerAuxService
+   * @return the port the PullServer Handler is listening on to serve shuffle data.
+   */
+  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+    //TODO this should be returning a class not just an int
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    return in.readInt();
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext appInitContext) {
+    // TODO these bytes should be versioned
+    // TODO: Once SHuffle is out of NM, this can use MR APIs
+    this.appId = appInitContext.getApplicationId();
+    this.queryId = TajoIdUtils.parseQueryId(appId.toString());
+    this.userName = appInitContext.getUser();
+    userRsrc.put(this.appId.toString(), this.userName);
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext appStopContext) {
+    userRsrc.remove(appStopContext.getApplicationId().toString());
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    try {
+      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+          DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+      ThreadFactory bossFactory = new ThreadFactoryBuilder()
+          .setNameFormat("PullServerAuxService Netty Boss #%d")
+          .build();
+      ThreadFactory workerFactory = new ThreadFactoryBuilder()
+          .setNameFormat("PullServerAuxService Netty Worker #%d")
+          .build();
+
+      selector = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(bossFactory),
+          Executors.newCachedThreadPool(workerFactory));
+
+      localFS = new LocalFileSystem();
+      super.init(new Configuration(conf));
+    } catch (Throwable t) {
+      LOG.error(t);
+    }
+  }
+
+  // TODO change AbstractService to throw InterruptedException
+  @Override
+  public synchronized void start() {
+    Configuration conf = getConfig();
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    bootstrap.setPipelineFactory(pipelineFact);
+    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+        ConfVars.PULLSERVER_PORT.defaultIntVal);
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+    pipelineFact.PullServer.setPort(port);
+    LOG.info(getName() + " listening on port " + port);
+    super.start();
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public synchronized void stop() {
+    try {
+      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+      ServerBootstrap bootstrap = new ServerBootstrap(selector);
+      bootstrap.releaseExternalResources();
+      pipelineFact.destroy();
+
+      localFS.close();
+    } catch (Throwable t) {
+      LOG.error(t);
+    } finally {
+      super.stop();
+    }
+  }
+
+  @Override
+  public synchronized ByteBuffer getMetaData() {
+    try {
+      return serializeMetaData(port); 
+    } catch (IOException e) {
+      LOG.error("Error during getMeta", e);
+      // TODO add API to AuxiliaryServices to report failures
+      return null;
+    }
+  }
+
+  class HttpPipelineFactory implements ChannelPipelineFactory {
+
+    final PullServer PullServer;
+    private SSLFactory sslFactory;
+
+    public HttpPipelineFactory(Configuration conf) throws Exception {
+      PullServer = new PullServer(conf);
+      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
+          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", PullServer);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
+    }
+  }
+
+  class PullServer extends SimpleChannelUpstreamHandler {
+    private final Configuration conf;
+    private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    private int port;
+
+    public PullServer(Configuration conf) {
+      this.conf = conf;
+      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
+    }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    private List<String> splitMaps(List<String> mapq) {
+      if (null == mapq) {
+        return null;
+      }
+      final List<String> ret = new ArrayList<String>();
+      for (String s : mapq) {
+        Collections.addAll(ret, s.split(","));
+      }
+      return ret;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+
+      HttpRequest request = (HttpRequest) e.getMessage();
+      if (request.getMethod() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
+      }
+
+      // Parsing the URL into key-values
+      final Map<String, List<String>> params =
+          new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> types = params.get("type");
+      final List<String> taskIdList = params.get("ta");
+      final List<String> subQueryIds = params.get("sid");
+      final List<String> partitionIds = params.get("p");
+
+      if (types == null || taskIdList == null || subQueryIds == null
+          || partitionIds == null) {
+        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
+            BAD_REQUEST);
+        return;
+      }
+
+      if (types.size() != 1 || subQueryIds.size() != 1) {
+        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
+            BAD_REQUEST);
+        return;
+      }
+
+      final List<FileChunk> chunks = Lists.newArrayList();
+
+      String repartitionType = types.get(0);
+      String sid = subQueryIds.get(0);
+      String partitionId = partitionIds.get(0);
+      List<String> taskIds = splitMaps(taskIdList);
+
+      // the working dir of tajo worker for each query
+      String queryBaseDir = queryId + "/output" + "/";
+
+      LOG.info("PullServer request param: repartitionType=" + repartitionType +
+          ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
+
+      String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
+      if (taskLocalDir == null ||
+          taskLocalDir.equals("")) {
+        LOG.error("Tajo local directory should be specified.");
+      }
+      LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
+
+      // if a subquery requires a range partitioning
+      if (repartitionType.equals("r")) {
+        String ta = taskIds.get(0);
+        Path path = localFS.makeQualified(
+            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
+                + ta + "/output/", conf));
+
+        String startKey = params.get("start").get(0);
+        String endKey = params.get("end").get(0);
+        boolean last = params.get("final") != null;
+
+        FileChunk chunk;
+        try {
+          chunk = getFileCunks(path, startKey, endKey, last);
+        } catch (Throwable t) {
+          LOG.error("ERROR Request: " + request.getUri(), t);
+          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+          return;
+        }
+        if (chunk != null) {
+          chunks.add(chunk);
+        }
+
+        // if a subquery requires a hash repartition  or a scattered hash repartition
+      } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
+        for (String ta : taskIds) {
+          Path path = localFS.makeQualified(
+              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
+                  ta + "/output/" + partitionId, conf));
+          File file = new File(path.toUri());
+          FileChunk chunk = new FileChunk(file, 0, file.length());
+          chunks.add(chunk);
+        }
+      } else {
+        LOG.error("Unknown repartition type: " + repartitionType);
+        return;
+      }
+
+      // Write the content.
+      Channel ch = e.getChannel();
+      if (chunks.size() == 0) {
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+        ch.write(response);
+        if (!isKeepAlive(request)) {
+          ch.close();
+        }
+      }  else {
+        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        long totalSize = 0;
+        for (FileChunk chunk : file) {
+          totalSize += chunk.length();
+        }
+        setContentLength(response, totalSize);
+
+        // Write the initial line and the header.
+        ch.write(response);
+
+        ChannelFuture writeFuture = null;
+
+        for (FileChunk chunk : file) {
+          writeFuture = sendFile(ctx, ch, chunk);
+          if (writeFuture == null) {
+            sendError(ctx, NOT_FOUND);
+            return;
+          }
+        }
+
+        // Decide whether to close the connection or not.
+        if (!isKeepAlive(request)) {
+          // Close the connection when the whole content is written out.
+          writeFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+      }
+    }
+
+    private ChannelFuture sendFile(ChannelHandlerContext ctx,
+                                   Channel ch,
+                                   FileChunk file) throws IOException {
+      RandomAccessFile spill;
+      try {
+        spill = new RandomAccessFile(file.getFile(), "r");
+      } catch (FileNotFoundException e) {
+        LOG.info(file.getFile() + " not found");
+        return null;
+      }
+      ChannelFuture writeFuture;
+      if (ch.getPipeline().get(SslHandler.class) == null) {
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            file.startOffset(), file.length(), manageOsCache, readaheadLength,
+            readaheadPool, file.getFile().getAbsolutePath());
+        writeFuture = ch.write(partition);
+        writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
+      } else {
+        // HTTPS cannot be done with zero copy.
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            file.startOffset(), file.length(), sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            file.getFile().getAbsolutePath());
+        writeFuture = ch.write(chunk);
+      }
+      metrics.shuffleConnections.incr();
+      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+      return writeFuture;
+    }
+
+    private void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    private void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.setContent(
+        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+      if (cause instanceof TooLongFrameException) {
+        sendError(ctx, BAD_REQUEST);
+        return;
+      }
+
+      LOG.error("PullServer error: ", cause);
+      if (ch.isConnected()) {
+        LOG.error("PullServer error " + e);
+        sendError(ctx, INTERNAL_SERVER_ERROR);
+      }
+    }
+  }
+
+  public FileChunk getFileCunks(Path outDir,
+                                      String startKey,
+                                      String endKey,
+                                      boolean last) throws IOException {
+    BSTIndex index = new BSTIndex(new TajoConf());
+    BSTIndex.BSTIndexReader idxReader =
+        index.getIndexReader(new Path(outDir, "index"));
+    idxReader.open();
+    Schema keySchema = idxReader.getKeySchema();
+    TupleComparator comparator = idxReader.getComparator();
+
+    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+        + idxReader.getLastKey());
+
+    File data = new File(URI.create(outDir.toUri() + "/output"));
+    byte [] startBytes = Base64.decodeBase64(startKey);
+    byte [] endBytes = Base64.decodeBase64(endKey);
+
+    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+    Tuple start;
+    Tuple end;
+    try {
+      start = decoder.toTuple(startBytes);
+    } catch (Throwable t) {
+      throw new IllegalArgumentException("StartKey: " + startKey
+          + ", decoded byte size: " + startBytes.length, t);
+    }
+
+    try {
+      end = decoder.toTuple(endBytes);
+    } catch (Throwable t) {
+      throw new IllegalArgumentException("EndKey: " + endKey
+          + ", decoded byte size: " + endBytes.length, t);
+    }
+
+
+    if(!comparator.isAscendingFirstKey()) {
+      Tuple tmpKey = start;
+      start = end;
+      end = tmpKey;
+    }
+
+    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+        (last ? ", last=true" : "") + ")");
+
+    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+      LOG.info("There is no contents");
+      return null;
+    }
+
+    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+        comparator.compare(idxReader.getLastKey(), start) < 0) {
+      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+          "], but request start:" + start + ", end: " + end);
+      return null;
+    }
+
+    long startOffset;
+    long endOffset;
+    try {
+      startOffset = idxReader.find(start);
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: "
+          + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+    try {
+      endOffset = idxReader.find(end);
+      if (endOffset == -1) {
+        endOffset = idxReader.find(end, true);
+      }
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: "
+          + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+
+    // if startOffset == -1 then case 2-1 or case 3
+    if (startOffset == -1) { // this is a hack
+      // if case 2-1 or case 3
+      try {
+        startOffset = idxReader.find(start, true);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end +")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+    }
+
+    if (startOffset == -1) {
+      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+          "State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+    }
+
+    // if greater than indexed values
+    if (last || (endOffset == -1
+        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+      endOffset = data.length();
+    }
+
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    LOG.info("Retrieve File Chunk: " + chunk);
+    return chunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
new file mode 100644
index 0000000..564950f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Method;
+
+public class PullServerUtil {
+  private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
+
+  private static boolean nativeIOPossible = false;
+  private static Method posixFadviseIfPossible;
+
+  static {
+    if (NativeIO.isAvailable() && loadNativeIO()) {
+      nativeIOPossible = true;
+    } else {
+      LOG.warn("Unable to load hadoop nativeIO");
+    }
+  }
+
+  public static boolean isNativeIOPossible() {
+    return nativeIOPossible;
+  }
+
+  /**
+   * Call posix_fadvise on the given file descriptor. See the manpage
+   * for this syscall for more information. On systems where this
+   * call is not available, does nothing.
+   */
+  public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
+                                            long offset, long len, int flags) {
+    if (nativeIOPossible) {
+      try {
+        posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+      } catch (Throwable t) {
+        nativeIOPossible = false;
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+
+  /* load hadoop native method if possible */
+  private static boolean loadNativeIO() {
+    boolean loaded = true;
+    if (nativeIOPossible) return loaded;
+
+    Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
+    try {
+      Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
+      Class posixClass;
+      if (getCacheManipulator != null) {
+        Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
+        posixClass = posix.getClass();
+      } else {
+        posixClass = NativeIO.POSIX.class;
+      }
+      posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
+    } catch (Throwable e) {
+      loaded = false;
+      LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
+    }
+
+    if (posixFadviseIfPossible == null) {
+      loaded = false;
+    }
+    return loaded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
new file mode 100644
index 0000000..d030eed
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
+import org.apache.tajo.util.StringUtils;
+
+public class TajoPullServer extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TajoPullServer.class);
+
+  private TajoPullServerService pullService;
+  private TajoConf systemConf;
+
+  public TajoPullServer() {
+    super(TajoPullServer.class.getName());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.systemConf = (TajoConf)conf;
+    pullService = new TajoPullServerService();
+    addService(pullService);
+
+    super.init(conf);
+  }
+
+  public void startPullServer(TajoConf systemConf) {
+    init(systemConf);
+    start();
+  }
+
+  public void start() {
+    super.start();
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
+
+    if (!TajoPullServerService.isStandalone()) {
+      LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
+      return;
+    }
+
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+    (new TajoPullServer()).startPullServer(tajoConf);
+  }
+}


[2/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.

Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
new file mode 100644
index 0000000..5a4e69f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -0,0 +1,808 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class TajoPullServerService extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
+
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+  private int port;
+  private ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private HttpPipelineFactory pipelineFact;
+  private int sslFileBufferSize;
+
+  private ApplicationId appId;
+  private FileSystem localFS;
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+
+  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+  private static final Map<String,String> userRsrc =
+    new ConcurrentHashMap<String,String>();
+  private String userName;
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "tajo.pullserver.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  private static boolean STANDALONE = false;
+
+  static {
+    String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
+    if (!StringUtils.isEmpty(standalone)) {
+      STANDALONE = standalone.equalsIgnoreCase("true");
+    }
+  }
+
+  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+  static class ShuffleMetrics implements ChannelFutureListener {
+    @Metric({"OutputBytes","PullServer output in bytes"})
+    MutableCounterLong shuffleOutputBytes;
+    @Metric({"Failed","# of failed shuffle outputs"})
+    MutableCounterInt shuffleOutputsFailed;
+    @Metric({"Succeeded","# of succeeded shuffle outputs"})
+    MutableCounterInt shuffleOutputsOK;
+    @Metric({"Connections","# of current shuffle connections"})
+    MutableGaugeInt shuffleConnections;
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isSuccess()) {
+        shuffleOutputsOK.incr();
+      } else {
+        shuffleOutputsFailed.incr();
+      }
+      shuffleConnections.decr();
+    }
+  }
+
+  final ShuffleMetrics metrics;
+
+  TajoPullServerService(MetricsSystem ms) {
+    super("httpshuffle");
+    metrics = ms.register(new ShuffleMetrics());
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public TajoPullServerService() {
+    this(DefaultMetricsSystem.instance());
+  }
+
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  public static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(port);
+    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+  }
+
+  /**
+   * A helper function to deserialize the metadata returned by PullServerAuxService.
+   * @param meta the metadata returned by the PullServerAuxService
+   * @return the port the PullServer Handler is listening on to serve shuffle data.
+   */
+  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+    //TODO this should be returning a class not just an int
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    return in.readInt();
+  }
+
+  public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
+    // TODO these bytes should be versioned
+    // TODO: Once SHuffle is out of NM, this can use MR APIs
+    this.appId = appId;
+    this.userName = user;
+    userRsrc.put(appId.toString(), user);
+  }
+
+  public void stopApp(ApplicationId appId) {
+    userRsrc.remove(appId.toString());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+          DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+      int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
+          Runtime.getRuntime().availableProcessors() * 2);
+
+      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
+
+      localFS = new LocalFileSystem();
+
+      conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
+          , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
+      super.init(conf);
+      LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
+    } catch (Throwable t) {
+      LOG.error(t);
+    }
+  }
+
+  // TODO change AbstractService to throw InterruptedException
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    bootstrap.setPipelineFactory(pipelineFact);
+
+    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+        ConfVars.PULLSERVER_PORT.defaultIntVal);
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+    pipelineFact.PullServer.setPort(port);
+    LOG.info(getName() + " listening on port " + port);
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
+
+    if (STANDALONE) {
+      File pullServerPortFile = getPullServerPortFile();
+      if (pullServerPortFile.exists()) {
+        pullServerPortFile.delete();
+      }
+      pullServerPortFile.getParentFile().mkdirs();
+      LOG.info("Write PullServerPort to " + pullServerPortFile);
+      FileOutputStream out = null;
+      try {
+        out = new FileOutputStream(pullServerPortFile);
+        out.write(("" + port).getBytes());
+      } catch (Exception e) {
+        LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile +
+            ", " + e.getMessage(), e);
+        System.exit(-1);
+      } finally {
+        IOUtils.closeStream(out);
+      }
+    }
+    super.serviceInit(conf);
+    LOG.info("TajoPullServerService started: port=" + port);
+  }
+
+  public static boolean isStandalone() {
+    return STANDALONE;
+  }
+
+  private static File getPullServerPortFile() {
+    String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
+    if (StringUtils.isEmpty(pullServerPortInfoFile)) {
+      pullServerPortInfoFile = "/tmp";
+    }
+    return new File(pullServerPortInfoFile + "/pullserver.port");
+  }
+
+  // TODO change to get port from master or tajoConf
+  public static int readPullServerPort() {
+    FileInputStream in = null;
+    try {
+      File pullServerPortFile = getPullServerPortFile();
+
+      if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
+        return -1;
+      }
+      in = new FileInputStream(pullServerPortFile);
+      byte[] buf = new byte[1024];
+      int readBytes = in.read(buf);
+      return Integer.parseInt(new String(buf, 0, readBytes));
+    } catch (IOException e) {
+      LOG.fatal(e.getMessage(), e);
+      return -1;
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public synchronized void stop() {
+    try {
+      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+      ServerBootstrap bootstrap = new ServerBootstrap(selector);
+      bootstrap.releaseExternalResources();
+      pipelineFact.destroy();
+
+      localFS.close();
+    } catch (Throwable t) {
+      LOG.error(t);
+    } finally {
+      super.stop();
+    }
+  }
+
+  public synchronized ByteBuffer getMeta() {
+    try {
+      return serializeMetaData(port); 
+    } catch (IOException e) {
+      LOG.error("Error during getMeta", e);
+      // TODO add API to AuxiliaryServices to report failures
+      return null;
+    }
+  }
+
+  class HttpPipelineFactory implements ChannelPipelineFactory {
+
+    final PullServer PullServer;
+    private SSLFactory sslFactory;
+
+    public HttpPipelineFactory(Configuration conf) throws Exception {
+      PullServer = new PullServer(conf);
+      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
+          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+
+      int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
+          ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
+      pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", PullServer);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
+    }
+  }
+
+
+  Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
+
+  public void completeFileChunk(FileRegion filePart,
+                                   String requestUri,
+                                   long startTime) {
+    ProcessingStatus status = processingStatusMap.get(requestUri);
+    if (status != null) {
+      status.decrementRemainFiles(filePart, startTime);
+    }
+  }
+
+  class ProcessingStatus {
+    String requestUri;
+    int numFiles;
+    AtomicInteger remainFiles;
+    long startTime;
+    long makeFileListTime;
+    long minTime = Long.MAX_VALUE;
+    long maxTime;
+    int numSlowFile;
+
+    public ProcessingStatus(String requestUri) {
+      this.requestUri = requestUri;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    public void setNumFiles(int numFiles) {
+      this.numFiles = numFiles;
+      this.remainFiles = new AtomicInteger(numFiles);
+    }
+    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
+      synchronized(remainFiles) {
+        long fileSendTime = System.currentTimeMillis() - fileStartTime;
+        if (fileSendTime > 20 * 1000) {
+          LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
+          numSlowFile++;
+        }
+        if (fileSendTime > maxTime) {
+          maxTime = fileSendTime;
+        }
+        if (fileSendTime < minTime) {
+          minTime = fileSendTime;
+        }
+        int remain = remainFiles.decrementAndGet();
+        if (remain <= 0) {
+          processingStatusMap.remove(requestUri);
+          LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
+              "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
+              "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
+        }
+      }
+    }
+  }
+
+  class PullServer extends SimpleChannelUpstreamHandler {
+
+    private final Configuration conf;
+//    private final IndexCache indexCache;
+    private final LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    private int port;
+
+    public PullServer(Configuration conf) throws IOException {
+      this.conf = conf;
+//      indexCache = new IndexCache(new JobConf(conf));
+      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+          ConfVars.PULLSERVER_PORT.defaultIntVal);
+
+      // init local temporal dir
+      lDirAlloc.getAllLocalPathsToRead(".", conf);
+    }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    private List<String> splitMaps(List<String> mapq) {
+      if (null == mapq) {
+        return null;
+      }
+      final List<String> ret = new ArrayList<String>();
+      for (String s : mapq) {
+        Collections.addAll(ret, s.split(","));
+      }
+      return ret;
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+        throws Exception {
+
+      accepted.add(evt.getChannel());
+      LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
+      super.channelOpen(ctx, evt);
+
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+
+      HttpRequest request = (HttpRequest) e.getMessage();
+      if (request.getMethod() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
+      }
+
+      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
+      processingStatusMap.put(request.getUri().toString(), processingStatus);
+      // Parsing the URL into key-values
+      final Map<String, List<String>> params =
+          new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> types = params.get("type");
+      final List<String> qids = params.get("qid");
+      final List<String> taskIdList = params.get("ta");
+      final List<String> subQueryIds = params.get("sid");
+      final List<String> partIds = params.get("p");
+      final List<String> offsetList = params.get("offset");
+      final List<String> lengthList = params.get("length");
+
+      if (types == null || subQueryIds == null || qids == null || partIds == null) {
+        sendError(ctx, "Required queryId, type, subquery Id, and part id",
+            BAD_REQUEST);
+        return;
+      }
+
+      if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+        sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id",
+            BAD_REQUEST);
+        return;
+      }
+
+      String partId = partIds.get(0);
+      String queryId = qids.get(0);
+      String shuffleType = types.get(0);
+      String sid = subQueryIds.get(0);
+
+      long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+      long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+      if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
+        sendError(ctx, "Required taskIds", BAD_REQUEST);
+      }
+
+      List<String> taskIds = splitMaps(taskIdList);
+
+      String queryBaseDir = queryId.toString() + "/output";
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PullServer request param: shuffleType=" + shuffleType +
+            ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
+
+        // the working dir of tajo worker for each query
+        LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+      }
+
+      final List<FileChunk> chunks = Lists.newArrayList();
+
+      // if a subquery requires a range shuffle
+      if (shuffleType.equals("r")) {
+        String ta = taskIds.get(0);
+        if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
+          LOG.warn(e);
+          sendError(ctx, NO_CONTENT);
+          return;
+        }
+        Path path = localFS.makeQualified(
+            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
+        String startKey = params.get("start").get(0);
+        String endKey = params.get("end").get(0);
+        boolean last = params.get("final") != null;
+
+        FileChunk chunk;
+        try {
+          chunk = getFileCunks(path, startKey, endKey, last);
+        } catch (Throwable t) {
+          LOG.error("ERROR Request: " + request.getUri(), t);
+          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+          return;
+        }
+        if (chunk != null) {
+          chunks.add(chunk);
+        }
+
+        // if a subquery requires a hash shuffle or a scattered hash shuffle
+      } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+        int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+        String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
+        if (!lDirAlloc.ifExists(partPath, conf)) {
+          LOG.warn("Partition shuffle file not exists: " + partPath);
+          sendError(ctx, NO_CONTENT);
+          return;
+        }
+
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
+
+        File file = new File(path.toUri());
+        long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+        long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+        if (startPos >= file.length()) {
+          String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+          LOG.error(errorMessage);
+          sendError(ctx, errorMessage, BAD_REQUEST);
+          return;
+        }
+        LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+        FileChunk chunk = new FileChunk(file, startPos, readLen);
+        chunks.add(chunk);
+      } else {
+        LOG.error("Unknown shuffle type: " + shuffleType);
+        sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
+        return;
+      }
+
+      processingStatus.setNumFiles(chunks.size());
+      processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
+      // Write the content.
+      Channel ch = e.getChannel();
+      if (chunks.size() == 0) {
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+        ch.write(response);
+        if (!isKeepAlive(request)) {
+          ch.close();
+        }
+      }  else {
+        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        long totalSize = 0;
+        for (FileChunk chunk : file) {
+          totalSize += chunk.length();
+        }
+        setContentLength(response, totalSize);
+
+        // Write the initial line and the header.
+        ch.write(response);
+
+        ChannelFuture writeFuture = null;
+
+        for (FileChunk chunk : file) {
+          writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
+          if (writeFuture == null) {
+            sendError(ctx, NOT_FOUND);
+            return;
+          }
+        }
+
+        // Decide whether to close the connection or not.
+        if (!isKeepAlive(request)) {
+          // Close the connection when the whole content is written out.
+          writeFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+      }
+    }
+
+    private ChannelFuture sendFile(ChannelHandlerContext ctx,
+                                   Channel ch,
+                                   FileChunk file,
+                                   String requestUri) throws IOException {
+      long startTime = System.currentTimeMillis();
+      RandomAccessFile spill = null;
+      ChannelFuture writeFuture;
+      try {
+        spill = new RandomAccessFile(file.getFile(), "r");
+        if (ch.getPipeline().get(SslHandler.class) == null) {
+          final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
+              file.startOffset(), file.length(), manageOsCache, readaheadLength,
+              readaheadPool, file.getFile().getAbsolutePath());
+          writeFuture = ch.write(filePart);
+          writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+        } else {
+          // HTTPS cannot be done with zero copy.
+          final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+              file.startOffset(), file.length(), sslFileBufferSize,
+              manageOsCache, readaheadLength, readaheadPool,
+              file.getFile().getAbsolutePath());
+          writeFuture = ch.write(chunk);
+        }
+      } catch (FileNotFoundException e) {
+        LOG.info(file.getFile() + " not found");
+        return null;
+      } catch (Throwable e) {
+        if (spill != null) {
+          //should close a opening file
+          spill.close();
+        }
+        return null;
+      }
+      metrics.shuffleConnections.incr();
+      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+      return writeFuture;
+    }
+
+    private void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    private void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.setContent(
+        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      LOG.error(e.getCause().getMessage(), e.getCause());
+      //if channel.close() is not called, never closed files in this request
+      if (ctx.getChannel().isConnected()){
+        ctx.getChannel().close();
+      }
+    }
+  }
+
+  public static FileChunk getFileCunks(Path outDir,
+                                      String startKey,
+                                      String endKey,
+                                      boolean last) throws IOException {
+    BSTIndex index = new BSTIndex(new TajoConf());
+    BSTIndex.BSTIndexReader idxReader =
+        index.getIndexReader(new Path(outDir, "index"));
+    idxReader.open();
+    Schema keySchema = idxReader.getKeySchema();
+    TupleComparator comparator = idxReader.getComparator();
+
+    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+        + idxReader.getLastKey());
+
+    File data = new File(URI.create(outDir.toUri() + "/output"));
+    byte [] startBytes = Base64.decodeBase64(startKey);
+    byte [] endBytes = Base64.decodeBase64(endKey);
+
+    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+    Tuple start;
+    Tuple end;
+    try {
+      start = decoder.toTuple(startBytes);
+    } catch (Throwable t) {
+      throw new IllegalArgumentException("StartKey: " + startKey
+          + ", decoded byte size: " + startBytes.length, t);
+    }
+
+    try {
+      end = decoder.toTuple(endBytes);
+    } catch (Throwable t) {
+      throw new IllegalArgumentException("EndKey: " + endKey
+          + ", decoded byte size: " + endBytes.length, t);
+    }
+
+    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+        (last ? ", last=true" : "") + ")");
+
+    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+      LOG.info("There is no contents");
+      return null;
+    }
+
+    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+        comparator.compare(idxReader.getLastKey(), start) < 0) {
+      LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+          "], but request start:" + start + ", end: " + end);
+      return null;
+    }
+
+    long startOffset;
+    long endOffset;
+    try {
+      startOffset = idxReader.find(start);
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: "
+          + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+    try {
+      endOffset = idxReader.find(end);
+      if (endOffset == -1) {
+        endOffset = idxReader.find(end, true);
+      }
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: "
+          + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+
+    // if startOffset == -1 then case 2-1 or case 3
+    if (startOffset == -1) { // this is a hack
+      // if case 2-1 or case 3
+      try {
+        startOffset = idxReader.find(start, true);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end +")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+    }
+
+    if (startOffset == -1) {
+      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+          "State Dump (the requested range: "
+          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+    }
+
+    // if greater than indexed values
+    if (last || (endOffset == -1
+        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+      endOffset = data.length();
+    }
+
+    idxReader.close();
+
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    LOG.info("Retrieve File Chunk: " + chunk);
+    return chunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
new file mode 100644
index 0000000..67e7423
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.pullserver.FileAccessForbiddenException;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AdvancedDataRetriever implements DataRetriever {
+  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
+
+  public AdvancedDataRetriever() {
+  }
+  
+  public void register(String taskAttemptId, RetrieverHandler handler) {
+    synchronized (handlerMap) {
+      if (!handlerMap.containsKey(taskAttemptId)) {
+        handlerMap.put(taskAttemptId, handler);
+      }
+    } 
+  }
+  
+  public void unregister(String taskAttemptId) {
+    synchronized (handlerMap) {
+      if (handlerMap.containsKey(taskAttemptId)) {
+        handlerMap.remove(taskAttemptId);
+      }
+    }
+  }
+
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+
+    final Map<String, List<String>> params =
+      new QueryStringDecoder(request.getUri()).getParameters();
+
+    if (!params.containsKey("qid")) {
+      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
+    }
+
+    if (params.containsKey("sid")) {
+      List<FileChunk> chunks = Lists.newArrayList();
+      List<String> queryUnidIds = splitMaps(params.get("qid"));
+      for (String eachQueryUnitId : queryUnidIds) {
+        String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
+        ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
+        QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
+
+        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
+
+        RetrieverHandler handler = handlerMap.get(attemptId.toString());
+        FileChunk chunk = handler.get(params);
+        chunks.add(chunk);
+      }
+      return chunks.toArray(new FileChunk[chunks.size()]);
+    } else {
+      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
+      FileChunk chunk = handler.get(params);
+      if (chunk == null) {
+        if (params.containsKey("qid")) { // if there is no content corresponding to the query
+          return null;
+        } else { // if there is no
+          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
+        }
+      }
+
+      File file = chunk.getFile();
+      if (file.isHidden() || !file.exists()) {
+        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
+      }
+      if (!file.isFile()) {
+        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
+      }
+
+      return new FileChunk[] {chunk};
+    }
+  }
+
+  private List<String> splitMaps(List<String> qids) {
+    if (null == qids) {
+      LOG.error("QueryUnitId is EMPTY");
+      return null;
+    }
+
+    final List<String> ret = new ArrayList<String>();
+    for (String qid : qids) {
+      Collections.addAll(ret, qid.split(","));
+    }
+    return ret;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
new file mode 100644
index 0000000..8f55f7b
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+import java.io.IOException;
+
+public interface DataRetriever {
+  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
new file mode 100644
index 0000000..dc63929
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.pullserver.FileAccessForbiddenException;
+import org.apache.tajo.pullserver.HttpDataServerHandler;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class DirectoryRetriever implements DataRetriever {
+  public String baseDir;
+  
+  public DirectoryRetriever(String baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
+    if (path == null) {
+      throw new IllegalArgumentException("Wrong path: " +path);
+    }
+
+    File file = new File(baseDir, path);
+    if (file.isHidden() || !file.exists()) {
+      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
+    }
+    if (!file.isFile()) {
+      throw new FileAccessForbiddenException("No such file: "
+          + baseDir + "/" + path); 
+    }
+    
+    return new FileChunk[] {new FileChunk(file, 0, file.length())};
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
new file mode 100644
index 0000000..67cff21
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class FileChunk {
+  private final File file;
+  private final long startOffset;
+  private long length;
+
+  /**
+   * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
+   */
+  private boolean fromRemote;
+
+  /**
+   * ExecutionBlockId
+   */
+  private String ebId;
+
+  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+    this.file = file;
+    this.startOffset = startOffset;
+    this.length = length;
+  }
+
+  public File getFile() {
+    return this.file;
+  }
+
+  public long startOffset() {
+    return this.startOffset;
+  }
+
+  public long length() {
+    return this.length;
+  }
+
+  public void setLength(long newLength) {
+    this.length = newLength;
+  }
+
+  public boolean fromRemote() {
+    return this.fromRemote;
+  }
+
+  public void setFromRemote(boolean newVal) {
+    this.fromRemote = newVal;
+  }
+
+  public String getEbId() {
+    return this.ebId;
+  }
+
+  public void setEbId(String newVal) {
+    this.ebId = newVal;
+  }
+
+  public String toString() {
+    return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
+	+ file.getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
new file mode 100644
index 0000000..5567c0d
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface RetrieverHandler {
+  /**
+   *
+   * @param kvs url-decoded key/value pairs
+   * @return a desired part of a file
+   * @throws java.io.IOException
+   */
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/pom.xml b/tajo-yarn-pullserver/pom.xml
deleted file mode 100644
index a7644a1..0000000
--- a/tajo-yarn-pullserver/pom.xml
+++ /dev/null
@@ -1,146 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>tajo-project</artifactId>
-    <groupId>org.apache.tajo</groupId>
-    <version>0.9.1-SNAPSHOT</version>
-    <relativePath>../tajo-project</relativePath>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <name>Tajo Core PullServer</name>
-  <artifactId>tajo-yarn-pullserver</artifactId>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>verify</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-catalog-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-storage</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-  <profiles>
-    <profile>
-      <id>docs</id>
-      <activation>
-        <activeByDefault>false</activeByDefault>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-javadoc-plugin</artifactId>
-            <executions>
-              <execution>
-                <!-- build javadoc jars per jar for publishing to maven -->
-                <id>module-javadocs</id>
-                <phase>package</phase>
-                <goals>
-                  <goal>jar</goal>
-                </goals>
-                <configuration>
-                  <destDir>${project.build.directory}</destDir>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
-  <reporting>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-project-info-reports-plugin</artifactId>
-        <version>2.4</version>
-        <configuration>
-          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-        </configuration>
-      </plugin>
-    </plugins>
-  </reporting>
-
-</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
deleted file mode 100644
index b0b8d18..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-public class FadvisedChunkedFile extends ChunkedFile {
-
-  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
-
-  private final boolean manageOsCache;
-  private final int readaheadLength;
-  private final ReadaheadPool readaheadPool;
-  private final FileDescriptor fd;
-  private final String identifier;
-
-  private ReadaheadPool.ReadaheadRequest readaheadRequest;
-
-  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
-                             int chunkSize, boolean manageOsCache, int readaheadLength,
-                             ReadaheadPool readaheadPool, String identifier) throws IOException {
-    super(file, position, count, chunkSize);
-    this.manageOsCache = manageOsCache;
-    this.readaheadLength = readaheadLength;
-    this.readaheadPool = readaheadPool;
-    this.fd = file.getFD();
-    this.identifier = identifier;
-  }
-
-  @Override
-  public Object nextChunk() throws Exception {
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
-      readaheadRequest = readaheadPool
-          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
-              getEndOffset(), readaheadRequest);
-    }
-    return super.nextChunk();
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
-      try {
-        PullServerUtil.posixFadviseIfPossible(identifier,
-            fd,
-            getStartOffset(), getEndOffset() - getStartOffset(),
-            NativeIO.POSIX.POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
-      }
-    }
-    super.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
deleted file mode 100644
index 18cf4b6..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-
-public class FadvisedFileRegion extends DefaultFileRegion {
-
-  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
-
-  private final boolean manageOsCache;
-  private final int readaheadLength;
-  private final ReadaheadPool readaheadPool;
-  private final FileDescriptor fd;
-  private final String identifier;
-  private final long count;
-  private final long position;
-  private final int shuffleBufferSize;
-  private final boolean shuffleTransferToAllowed;
-  private final FileChannel fileChannel;
-
-  private ReadaheadPool.ReadaheadRequest readaheadRequest;
-  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-
-  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
-                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-                            String identifier) throws IOException {
-    this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
-        identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
-  }
-
-  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
-                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-                            String identifier, int shuffleBufferSize,
-                            boolean shuffleTransferToAllowed) throws IOException {
-    super(file.getChannel(), position, count);
-    this.manageOsCache = manageOsCache;
-    this.readaheadLength = readaheadLength;
-    this.readaheadPool = readaheadPool;
-    this.fd = file.getFD();
-    this.identifier = identifier;
-    this.fileChannel = file.getChannel();
-    this.count = count;
-    this.position = position;
-    this.shuffleBufferSize = shuffleBufferSize;
-    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
-  }
-
-  @Override
-  public long transferTo(WritableByteChannel target, long position)
-      throws IOException {
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
-      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          getPosition() + position, readaheadLength,
-          getPosition() + getCount(), readaheadRequest);
-    }
-
-    if(this.shuffleTransferToAllowed) {
-      return super.transferTo(target, position);
-    } else {
-      return customShuffleTransfer(target, position);
-    }
-  }
-
-  /**
-   * This method transfers data using local buffer. It transfers data from
-   * a disk to a local buffer in memory, and then it transfers data from the
-   * buffer to the target. This is used only if transferTo is disallowed in
-   * the configuration file. super.TransferTo does not perform well on Windows
-   * due to a small IO request generated. customShuffleTransfer can control
-   * the size of the IO requests by changing the size of the intermediate
-   * buffer.
-   */
-  @VisibleForTesting
-  long customShuffleTransfer(WritableByteChannel target, long position)
-      throws IOException {
-    long actualCount = this.count - position;
-    if (actualCount < 0 || position < 0) {
-      throw new IllegalArgumentException(
-          "position out of range: " + position +
-              " (expected: 0 - " + (this.count - 1) + ')');
-    }
-    if (actualCount == 0) {
-      return 0L;
-    }
-
-    long trans = actualCount;
-    int readSize;
-    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
-
-    while(trans > 0L &&
-        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
-      //adjust counters and buffer limit
-      if(readSize < trans) {
-        trans -= readSize;
-        position += readSize;
-        byteBuffer.flip();
-      } else {
-        //We can read more than we need if the actualCount is not multiple
-        //of the byteBuffer size and file is big enough. In that case we cannot
-        //use flip method but we need to set buffer limit manually to trans.
-        byteBuffer.limit((int)trans);
-        byteBuffer.position(0);
-        position += trans;
-        trans = 0;
-      }
-
-      //write data to the target
-      while(byteBuffer.hasRemaining()) {
-        target.write(byteBuffer);
-      }
-
-      byteBuffer.clear();
-    }
-
-    return actualCount - trans;
-  }
-
-
-  @Override
-  public void releaseExternalResources() {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    super.releaseExternalResources();
-  }
-
-  /**
-   * Call when the transfer completes successfully so we can advise the OS that
-   * we don't need the region to be cached anymore.
-   */
-  public void transferSuccessful() {
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
-      try {
-        PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
-            NativeIO.POSIX.POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
deleted file mode 100644
index c703f6f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
-  private static final long serialVersionUID = -3383272565826389213L;
-
-  public FileAccessForbiddenException() {
-  }
-
-  public FileAccessForbiddenException(String message) {
-    super(message);
-  }
-
-  public FileAccessForbiddenException(Throwable cause) {
-    super(cause);
-  }
-
-  public FileAccessForbiddenException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
deleted file mode 100644
index 236db89..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-public class FileCloseListener implements ChannelFutureListener {
-
-  private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
-
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
-    this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
-  }
-
-  // TODO error handling; distinguish IO/connection failures,
-  //      attribute to appropriate spill output
-  @Override
-  public void operationComplete(ChannelFuture future) {
-    if(future.isSuccess()){
-      filePart.transferSuccessful();
-    }
-    filePart.releaseExternalResources();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
deleted file mode 100644
index 31db15c..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.pullserver.retriever.DataRetriever;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.*;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
-  private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
-
-  Map<ExecutionBlockId, DataRetriever> retrievers =
-      new ConcurrentHashMap<ExecutionBlockId, DataRetriever>();
-  private String userName;
-  private String appId;
-
-  public HttpDataServerHandler(String userName, String appId) {
-    this.userName= userName;
-    this.appId = appId;
-  }
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-      throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
-      return;
-    }
-
-    String base =
-        ContainerLocalizer.USERCACHE + "/" + userName + "/"
-            + ContainerLocalizer.APPCACHE + "/"
-            + appId + "/output" + "/";
-
-    final Map<String, List<String>> params =
-        new QueryStringDecoder(request.getUri()).getParameters();
-
-    List<FileChunk> chunks = Lists.newArrayList();
-    List<String> taskIds = splitMaps(params.get("ta"));
-    int sid = Integer.valueOf(params.get("sid").get(0));
-    int partitionId = Integer.valueOf(params.get("p").get(0));
-    for (String ta : taskIds) {
-
-      File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId);
-      FileChunk chunk = new FileChunk(file, 0, file.length());
-      chunks.add(chunk);
-    }
-
-    FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-//    try {
-//      file = retriever.handle(ctx, request);
-//    } catch (FileNotFoundException fnf) {
-//      LOG.error(fnf);
-//      sendError(ctx, NOT_FOUND);
-//      return;
-//    } catch (IllegalArgumentException iae) {
-//      LOG.error(iae);
-//      sendError(ctx, BAD_REQUEST);
-//      return;
-//    } catch (FileAccessForbiddenException fafe) {
-//      LOG.error(fafe);
-//      sendError(ctx, FORBIDDEN);
-//      return;
-//    } catch (IOException ioe) {
-//      LOG.error(ioe);
-//      sendError(ctx, INTERNAL_SERVER_ERROR);
-//      return;
-//    }
-
-    // Write the content.
-    Channel ch = e.getChannel();
-    if (file == null) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-      ch.write(response);
-      if (!isKeepAlive(request)) {
-        ch.close();
-      }
-    }  else {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      long totalSize = 0;
-      for (FileChunk chunk : file) {
-        totalSize += chunk.length();
-      }
-      setContentLength(response, totalSize);
-
-      // Write the initial line and the header.
-      ch.write(response);
-
-      ChannelFuture writeFuture = null;
-
-      for (FileChunk chunk : file) {
-        writeFuture = sendFile(ctx, ch, chunk);
-        if (writeFuture == null) {
-          sendError(ctx, NOT_FOUND);
-          return;
-        }
-      }
-
-      // Decide whether to close the connection or not.
-      if (!isKeepAlive(request)) {
-        // Close the connection when the whole content is written out.
-        writeFuture.addListener(ChannelFutureListener.CLOSE);
-      }
-    }
-  }
-
-  private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                 Channel ch,
-                                 FileChunk file) throws IOException {
-    RandomAccessFile raf;
-    try {
-      raf = new RandomAccessFile(file.getFile(), "r");
-    } catch (FileNotFoundException fnfe) {
-      return null;
-    }
-
-    ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
-      // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
-          file.length(), 8192));
-    } else {
-      // No encryption - use zero-copy.
-      final FileRegion region = new DefaultFileRegion(raf.getChannel(),
-          file.startOffset(), file.length());
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
-        }
-      });
-    }
-
-    return writeFuture;
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-      throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
-    if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
-      return;
-    }
-
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  public static String sanitizeUri(String uri) {
-    // Decode the path.
-    try {
-      uri = URLDecoder.decode(uri, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      try {
-        uri = URLDecoder.decode(uri, "ISO-8859-1");
-      } catch (UnsupportedEncodingException e1) {
-        throw new Error();
-      }
-    }
-
-    // Convert file separators.
-    uri = uri.replace('/', File.separatorChar);
-
-    // Simplistic dumb security check.
-    // You will have to do something serious in the production environment.
-    if (uri.contains(File.separator + ".")
-        || uri.contains("." + File.separator) || uri.startsWith(".")
-        || uri.endsWith(".")) {
-      return null;
-    }
-
-    // Convert to absolute path.
-    return uri;
-  }
-
-  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
-    // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-  }
-
-  private List<String> splitMaps(List<String> qids) {
-    if (null == qids) {
-      LOG.error("QueryUnitId is EMPTY");
-      return null;
-    }
-
-    final List<String> ret = new ArrayList<String>();
-    for (String qid : qids) {
-      Collections.addAll(ret, qid.split(","));
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 4c8bd8b..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
-  private String userName;
-  private String appId;
-  public HttpDataServerPipelineFactory(String userName, String appId) {
-    this.userName = userName;
-    this.appId = appId;
-  }
-
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following line if you want HTTPS
-    // SSLEngine engine =
-    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    // engine.setUseClientMode(false);
-    // pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-    pipeline.addLast("deflater", new HttpContentCompressor());
-    pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
-    return pipeline;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
deleted file mode 100644
index 2cbb101..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
-  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
-    return getParamsFromQuery(uri.getQuery());
-  }
-
-  /**
-   * It parses a query string into key/value pairs
-   *
-   * @param queryString decoded query string
-   * @return key/value pairs parsed from a given query string
-   * @throws java.io.UnsupportedEncodingException
-   */
-  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
-    String [] queries = queryString.split("&");
-
-    Map<String,String> params = Maps.newHashMap();
-    String [] param;
-    for (String q : queries) {
-      param = q.split("=");
-      params.put(param[0], param[1]);
-    }
-
-    return params;
-  }
-
-  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-
-    boolean first = true;
-    for (Map.Entry<String,String> param : params.entrySet()) {
-      if (!first) {
-        sb.append("&");
-      }
-      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
-          append("=").
-          append(URLEncoder.encode(param.getValue(), "UTF-8"));
-      first = false;
-    }
-
-    return sb.toString();
-  }
-}