You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:08 UTC

[1/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

Repository: tajo
Updated Branches:
  refs/heads/master 71193b218 -> 73ac4b87d


http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
new file mode 100644
index 0000000..17a753f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private FileChannel fileChannel;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier) throws IOException {
+    this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+        identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+  }
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier, int shuffleBufferSize,
+                            boolean shuffleTransferToAllowed) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    }
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from
+   * a disk to a local buffer in memory, and then it transfers data from the
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows
+   * due to a small IO request generated. customShuffleTransfer can control
+   * the size of the IO requests by changing the size of the intermediate
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+              " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans;
+        trans = 0;
+      }
+
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+
+      byteBuffer.clear();
+    }
+
+    return actualCount - trans;
+  }
+
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+      readaheadRequest = null;
+    }
+    fileChannel = null;
+    super.releaseExternalResources();
+  }
+
+  /**
+   * Call when the transfer completes successfully so we can advise the OS that
+   * we don't need the region to be cached anymore.
+   */
+  public void transferSuccessful() {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0 && fileChannel != null) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
new file mode 100644
index 0000000..a85d0b9
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+  private FadvisedFileRegion filePart;
+
+  public FileCloseListener(FadvisedFileRegion filePart) {
+    this.filePart = filePart;
+  }
+
+  // TODO error handling; distinguish IO/connection failures,
+  //      attribute to appropriate spill output
+  @Override
+  public void operationComplete(ChannelFuture future) {
+    if(future.isSuccess()){
+      filePart.transferSuccessful();
+    }
+    filePart.releaseExternalResources();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
new file mode 100644
index 0000000..369e17f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class TajoPullServerService extends AuxiliaryService {
+
+  private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
+
+  private TajoConf tajoConf;
+
+  private int port;
+  private ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup("Pull server group");
+  private HttpChannelInitializer channelInitializer;
+  private int sslFileBufferSize;
+  private int maxUrlLength;
+
+  private ApplicationId appId;
+  private FileSystem localFS;
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+  private static final Map<String,String> userRsrc =
+          new ConcurrentHashMap<>();
+  private String userName;
+
+  private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+  private int lowCacheHitCheckThreshold;
+
+  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+  static class ShuffleMetrics implements ChannelFutureListener {
+    @Metric({"OutputBytes","PullServer output in bytes"})
+    MutableCounterLong shuffleOutputBytes;
+    @Metric({"Failed","# of failed shuffle outputs"})
+    MutableCounterInt shuffleOutputsFailed;
+    @Metric({"Succeeded","# of succeeded shuffle outputs"})
+    MutableCounterInt shuffleOutputsOK;
+    @Metric({"Connections","# of current shuffle connections"})
+    MutableGaugeInt shuffleConnections;
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isSuccess()) {
+        shuffleOutputsOK.incr();
+      } else {
+        shuffleOutputsFailed.incr();
+      }
+      shuffleConnections.decr();
+    }
+  }
+
+  final ShuffleMetrics metrics;
+
+  TajoPullServerService(MetricsSystem ms) {
+    super(PullServerConstants.PULLSERVER_SERVICE_NAME);
+    metrics = ms.register(new ShuffleMetrics());
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public TajoPullServerService() {
+    this(DefaultMetricsSystem.instance());
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+    // TODO these bytes should be versioned
+    // TODO: Once SHuffle is out of NM, this can use MR APIs
+    String user = context.getUser();
+    ApplicationId appId = context.getApplicationId();
+    //    ByteBuffer secret = context.getApplicationDataForService();
+    userRsrc.put(appId.toString(), user);
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+    userRsrc.remove(context.getApplicationId().toString());
+  }
+
+  // TODO change AbstractService to throw InterruptedException
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    tajoConf = new TajoConf(conf);
+
+    manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+        PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+        PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+    int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
+
+    ThreadFactory bossFactory = new ThreadFactoryBuilder()
+        .setNameFormat("TajoPullServerService Netty Boss #%d")
+        .build();
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setNameFormat("TajoPullServerService Netty Worker #%d")
+        .build();
+    selector = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory),
+        workerNum);
+
+    localFS = new LocalFileSystem();
+
+    maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+
+    LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
+
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    try {
+      channelInitializer = new HttpChannelInitializer(tajoConf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    bootstrap.setPipelineFactory(channelInitializer);
+
+    port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+    LOG.info(getName() + " listening on port " + port);
+
+    sslFileBufferSize = tajoConf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
+    int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
+    int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+
+    indexReaderCache = CacheBuilder.newBuilder()
+        .maximumSize(cacheSize)
+        .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+        .removalListener(removalListener)
+        .build(
+            new CacheLoader<IndexCacheKey, BSTIndexReader>() {
+              @Override
+              public BSTIndexReader load(IndexCacheKey key) throws Exception {
+                return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
+              }
+            }
+        );
+    lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
+
+    super.serviceInit(tajoConf);
+    LOG.info("TajoPullServerService started: port=" + port);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    // TODO: check this wait
+    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    if (selector != null) {
+      ServerBootstrap bootstrap = new ServerBootstrap(selector);
+      bootstrap.releaseExternalResources();
+    }
+
+    if (channelInitializer != null) {
+      channelInitializer.destroy();
+    }
+
+    localFS.close();
+    indexReaderCache.invalidateAll();
+
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    try {
+      return serializeMetaData(port);
+    } catch (IOException e) {
+      LOG.error("Error during getMeta", e);
+      // TODO add API to AuxiliaryServices to report failures
+      return null;
+    }
+  }
+
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  public static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    return ByteBuffer.allocate(SizeOf.SIZE_OF_INT).putInt(port);
+  }
+
+  class HttpChannelInitializer implements ChannelPipelineFactory {
+
+    final PullServer PullServer;
+    private SSLFactory sslFactory;
+
+    public HttpChannelInitializer(TajoConf conf) throws Exception {
+      PullServer = new PullServer(conf);
+      if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
+          ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
+      pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", PullServer);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
+    }
+  }
+
+  @ChannelHandler.Sharable
+  class PullServer extends SimpleChannelUpstreamHandler {
+
+    private final TajoConf conf;
+    private final LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    private final Gson gson = new Gson();
+
+    public PullServer(TajoConf conf) throws IOException {
+      this.conf = conf;
+
+      // init local temporal dir
+      lDirAlloc.getAllLocalPathsToRead(".", conf);
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) throws Exception {
+      accepted.add(evt.getChannel());
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Current number of shuffle connections (%d)", accepted.size()));
+      }
+      super.channelOpen(ctx, evt);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+            throws Exception {
+
+      HttpRequest request = (HttpRequest) evt.getMessage();
+      Channel ch = evt.getChannel();
+
+      if (request.getMethod() == HttpMethod.DELETE) {
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        ch.write(response).addListener(ChannelFutureListener.CLOSE);
+
+        clearIndexCache(request.getUri());
+        return;
+      } else if (request.getMethod() != HttpMethod.GET) {
+        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+        return;
+      }
+
+      // Parsing the URL into key-values
+      try {
+        final PullServerParams params = new PullServerParams(request.getUri());
+        if (PullServerUtil.isChunkRequest(params.requestType())) {
+          handleChunkRequest(ctx, request, params);
+        } else {
+          handleMetaRequest(ctx, request, params);
+        }
+      } catch (Throwable e) {
+        LOG.error("Failed to handle request " + request.getUri());
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
+      }
+    }
+
+    /**
+     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+     * It is called whenever an execution block is completed.
+     *
+     * @param uri query URI which indicates the execution block id
+     * @throws IOException
+     * @throws InvalidURLException
+     */
+    public void clearIndexCache(String uri)
+        throws IOException, InvalidURLException {
+      // Simply parse the given uri
+      String[] tokens = uri.split("=");
+      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+        throw new IllegalArgumentException("invalid params: " + uri);
+      }
+      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+      String queryId = ebId.getQueryId().toString();
+      String ebSeqId = Integer.toString(ebId.getId());
+      List<IndexCacheKey> removed = new ArrayList<>();
+      synchronized (indexReaderCache) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        indexReaderCache.invalidateAll(removed);
+      }
+      removed.clear();
+      synchronized (waitForRemove) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        for (IndexCacheKey eachKey : removed) {
+          waitForRemove.remove(eachKey);
+        }
+      }
+    }
+
+    private void handleMetaRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
+        throws IOException, ExecutionException {
+      final List<String> jsonMetas;
+      try {
+        jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
+      }
+
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
+      response.setContent(ChannelBuffers.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+      response.setHeader(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+      HttpHeaders.setContentLength(response, response.getContent().readableBytes());
+      if (HttpHeaders.isKeepAlive(request)) {
+        response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+      }
+      ChannelFuture writeFuture = ctx.getChannel().write(response);
+
+      // Decide whether to close the connection or not.
+      if (!HttpHeaders.isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
+
+    private void handleChunkRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
+        throws IOException {
+      final List<FileChunk> chunks;
+      try {
+        chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
+      }
+
+      // Write the content.
+      final Channel ch = ctx.getChannel();
+      if (chunks.size() == 0) {
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+        if (!HttpHeaders.isKeepAlive(request)) {
+          ch.write(response).addListener(ChannelFutureListener.CLOSE);
+        } else {
+          response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+          ch.write(response);
+        }
+      } else {
+        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+        ChannelFuture writeFuture = null;
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
+        long totalSize = 0;
+        StringBuilder sb = new StringBuilder();
+        for (FileChunk chunk : file) {
+          totalSize += chunk.length();
+          sb.append(Long.toString(chunk.length())).append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
+        HttpHeaders.setContentLength(response, totalSize);
+
+        if (HttpHeaders.isKeepAlive(request)) {
+          response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+        }
+        // Write the initial line and the header.
+        writeFuture = ch.write(response);
+
+        for (FileChunk chunk : file) {
+          writeFuture = sendFile(ctx, chunk);
+          if (writeFuture == null) {
+            sendError(ctx, HttpResponseStatus.NOT_FOUND);
+            return;
+          }
+        }
+
+        // Decide whether to close the connection or not.
+        if (!HttpHeaders.isKeepAlive(request)) {
+          // Close the connection when the whole content is written out.
+          writeFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+      }
+    }
+
+    private ChannelFuture sendFile(ChannelHandlerContext ctx,
+                                   FileChunk file) throws IOException {
+      Channel ch = ctx.getChannel();
+      RandomAccessFile spill = null;      
+      ChannelFuture writeFuture;
+      try {
+        spill = new RandomAccessFile(file.getFile(), "r");
+        if (ctx.getPipeline().get(SslHandler.class) == null) {
+          final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
+              file.startOffset(), file.length(), manageOsCache, readaheadLength,
+              readaheadPool, file.getFile().getAbsolutePath());
+          writeFuture = ch.write(filePart);
+          writeFuture.addListener(new FileCloseListener(filePart));
+        } else {
+          // HTTPS cannot be done with zero copy.
+          final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+              file.startOffset(), file.length(), sslFileBufferSize,
+              manageOsCache, readaheadLength, readaheadPool,
+              file.getFile().getAbsolutePath());
+          writeFuture = ch.write(chunk);
+        }
+      } catch (FileNotFoundException e) {
+        LOG.fatal(file.getFile() + " not found");
+        return null;
+      } catch (Throwable e) {
+        LOG.fatal("error while sending a file: ", e);
+        if (spill != null) {
+          //should close a opening file
+          LOG.warn("Close the file " + file.getFile().getAbsolutePath());
+          spill.close();
+        }
+        return null;
+      }
+      metrics.shuffleConnections.incr();
+      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+      return writeFuture;
+    }
+
+    private void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    private void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+      // Put shuffle version into http header
+      ChannelBuffer content = ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8);
+      response.setContent(content);
+      response.setHeader(Names.CONTENT_LENGTH, content.writerIndex());
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+      LOG.error(cause.getMessage(), cause);
+      if (ch.isOpen()) {
+        ch.close();
+      }
+    }
+  }
+
+  // Temporal space to wait for the completion of all index lookup operations
+  private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+
+  // RemovalListener is triggered when an item is removed from the index reader cache.
+  // It closes index readers when they are not used anymore.
+  // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
+  private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
+    BSTIndexReader reader = removal.getValue();
+    if (reader.getReferenceNum() == 0) {
+      try {
+        reader.close(); // tear down properly
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      waitForRemove.remove(removal.getKey());
+    } else {
+      waitForRemove.put(removal.getKey(), reader);
+    }
+  };
+}


[3/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
new file mode 100644
index 0000000..e346bdf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+  package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.rpc.NettyUtils;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LocalFetcher retrieves locally stored data. Its behavior can be different according to the pull server is running
+ * externally or internally.
+ *
+ * <ul>
+ *   <li>When an internal pull server is running, local fetchers can retrieve data directly.</li>
+ *   <li>When an external pull server is running,</li>
+ *   <ul>
+ *     <li>If the shuffle type is hash, local fetchers can still retrieve data directly.</li>
+ *     <li>If the shuffle type is range, local fetchers need to get meta information of data via HTTP. Once the meta
+ *     information is retrieved, they can read data directly.</li>
+ *   </ul>
+ * </ul>
+ */
+public class LocalFetcher extends AbstractFetcher {
+
+  private final static Log LOG = LogFactory.getLog(LocalFetcher.class);
+
+  private final TajoPullServerService pullServerService;
+
+  private final String host;
+  private int port;
+  private final Bootstrap bootstrap;
+  private final int maxUrlLength;
+  private final List<FileChunkMeta> chunkMetas = new ArrayList<>();
+  private final String tableName;
+  private final FileSystem localFileSystem;
+  private final LocalDirAllocator localDirAllocator;
+
+  @VisibleForTesting
+  public LocalFetcher(TajoConf conf, URI uri, String tableName) throws IOException {
+    super(conf, uri);
+    this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    this.tableName = tableName;
+    this.localFileSystem = new LocalFileSystem();
+    this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    this.pullServerService = null;
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+
+    bootstrap = new Bootstrap()
+        .group(
+            NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+            conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+        .option(ChannelOption.TCP_NODELAY, true);
+  }
+
+  public LocalFetcher(TajoConf conf, URI uri, ExecutionBlockContext executionBlockContext, String tableName) {
+    super(conf, uri);
+    this.localFileSystem = executionBlockContext.getLocalFS();
+    this.localDirAllocator = executionBlockContext.getLocalDirAllocator();
+    this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    this.tableName = tableName;
+
+    Optional<TajoPullServerService> optional = executionBlockContext.getSharedResource().getPullServerService();
+    if (optional.isPresent()) {
+      // local pull server service
+      this.pullServerService = optional.get();
+      this.host = null;
+      this.bootstrap = null;
+
+    } else if (PullServerUtil.useExternalPullServerService(conf)) {
+      // external pull server service
+      pullServerService = null;
+
+      String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+      this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+      this.port = uri.getPort();
+      if (port == -1) {
+        if (scheme.equalsIgnoreCase("http")) {
+          this.port = 80;
+        } else if (scheme.equalsIgnoreCase("https")) {
+          this.port = 443;
+        }
+      }
+
+      bootstrap = new Bootstrap()
+          .group(
+              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                  conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+          .channel(NioSocketChannel.class)
+          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+              conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+          .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+          .option(ChannelOption.TCP_NODELAY, true);
+    } else {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new TajoInternalError("Pull server service is not initialized");
+    }
+  }
+
+  @Override
+  public List<FileChunk> get() throws IOException {
+    this.startTime = System.currentTimeMillis();
+    return pullServerService != null ? getWithInternalPullServer() : getWithExternalPullServer();
+  }
+
+  private List<FileChunk> getWithInternalPullServer() throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+    PullServerParams params = new PullServerParams(uri.toString());
+    try {
+      fileChunks.addAll(pullServerService.getFileChunks(conf, localDirAllocator, params));
+    } catch (ExecutionException e) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new TajoInternalError(e);
+    }
+    fileChunks.stream().forEach(c -> c.setEbId(tableName));
+    endFetch(FetcherState.FETCH_DATA_FINISHED);
+    if (fileChunks.size() > 0) {
+      fileLen = fileChunks.get(0).length();
+      fileNum = 1;
+    } else {
+      fileNum = 0;
+      fileLen = 0;
+    }
+    return fileChunks;
+  }
+
+  private List<FileChunk> getWithExternalPullServer() throws IOException {
+    final PullServerParams params = new PullServerParams(uri.toString());
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+
+    if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+      return getChunksForRangeShuffle(params, queryBaseDir);
+    } else if (PullServerUtil.isHashShuffle(params.shuffleType())) {
+      return getChunksForHashShuffle(params, queryBaseDir);
+    } else {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IllegalArgumentException("unknown shuffle type: " + params.shuffleType());
+    }
+  }
+
+  private List<FileChunk> getChunksForHashShuffle(final PullServerParams params, final Path queryBaseDir)
+      throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+    final String partId = params.partId();
+    final long offset = params.offset();
+    final long length = params.length();
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    if (!localDirAllocator.ifExists(partPath.toString(), conf)) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+    }
+    final Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(partPath.toString(), conf));
+    final File file = new File(path.toUri());
+    final long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+    final long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+    if (startPos >= file.length()) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+    }
+    if (readLen > 0) {
+      final FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunk.setEbId(tableName);
+      chunk.setFromRemote(false);
+      fileChunks.add(chunk);
+      fileLen = chunk.length();
+      fileNum = 1;
+    }
+
+    endFetch(FetcherState.FETCH_DATA_FINISHED);
+    return fileChunks;
+  }
+
+  private List<FileChunk> getChunksForRangeShuffle(final PullServerParams params, final Path queryBaseDir)
+      throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+
+    if (state == FetcherState.FETCH_INIT) {
+      final ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer();
+      bootstrap.handler(initializer);
+    }
+
+    this.state = FetcherState.FETCH_META_FETCHING;
+    ChannelFuture future = null;
+    try {
+      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+          .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+      // Wait until the connection attempt succeeds or fails.
+      Channel channel = future.awaitUninterruptibly().channel();
+      if (!future.isSuccess()) {
+        endFetch(FetcherState.FETCH_FAILED);
+        throw new IOException(future.cause());
+      }
+
+      for (URI eachURI : createChunkMetaRequestURIs(host, port, params)) {
+        String query = eachURI.getPath()
+            + (eachURI.getRawQuery() != null ? "?" + eachURI.getRawQuery() : "");
+        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+        request.headers().set(HttpHeaders.Names.HOST, host);
+        request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+        request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Status: " + getState() + ", URI:" + eachURI);
+        }
+        // Send the HTTP request.
+        channel.writeAndFlush(request);
+      }
+      // Wait for the server to close the connection. throw exception if failed
+      channel.closeFuture().syncUninterruptibly();
+
+      if (!state.equals(FetcherState.FETCH_META_FINISHED)) {
+        endFetch(FetcherState.FETCH_FAILED);
+      } else {
+        state = FetcherState.FETCH_DATA_FETCHING;
+        fileLen = fileNum = 0;
+        for (FileChunkMeta eachMeta : chunkMetas) {
+          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachMeta.getTaskId(), "output");
+          if (!localDirAllocator.ifExists(outputPath.toString(), conf)) {
+            LOG.warn("Range shuffle - file not exist. " + outputPath);
+            continue;
+          }
+          Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(outputPath.toString(), conf));
+          File file = new File(URI.create(path.toUri() + "/output"));
+          FileChunk chunk = new FileChunk(file, eachMeta.getStartOffset(), eachMeta.getLength());
+          chunk.setEbId(tableName);
+          fileChunks.add(chunk);
+          fileLen += chunk.length();
+          fileNum++;
+        }
+        endFetch(FetcherState.FETCH_DATA_FINISHED);
+      }
+
+      return fileChunks;
+    } finally {
+      if(future != null && future.channel().isOpen()){
+        // Close the channel to exit.
+        future.channel().close().awaitUninterruptibly();
+      }
+    }
+  }
+
+  public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+    private int length = -1;
+    private int totalReceivedContentLength = 0;
+    private byte[] buf;
+    private final Gson gson = new Gson();
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      messageReceiveCount++;
+      if (msg instanceof HttpResponse) {
+        try {
+          HttpResponse response = (HttpResponse) msg;
+
+          StringBuilder sb = new StringBuilder();
+          if (LOG.isDebugEnabled()) {
+            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+                .append(response.getProtocolVersion()).append(", HEADER: ");
+          }
+          if (!response.headers().names().isEmpty()) {
+            for (String name : response.headers().names()) {
+              for (String value : response.headers().getAll(name)) {
+                if (LOG.isDebugEnabled()) {
+                  sb.append(name).append(" = ").append(value);
+                }
+                if (this.length == -1 && name.equals("Content-Length")) {
+                  this.length = Integer.parseInt(value);
+                  if (buf == null || buf.length < this.length) {
+                    buf = new byte[this.length];
+                  }
+                }
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sb.toString());
+          }
+
+          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+            LOG.warn("There are no data corresponding to the request");
+            length = 0;
+            return;
+          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+            LOG.error(response.getStatus().reasonPhrase());
+            state = TajoProtos.FetcherState.FETCH_FAILED;
+            return;
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+
+      if (msg instanceof HttpContent) {
+        HttpContent httpContent = (HttpContent) msg;
+        ByteBuf content = httpContent.content();
+
+        if (state != FetcherState.FETCH_FAILED) {
+          try {
+            if (content.isReadable()) {
+              int contentLength = content.readableBytes();
+              content.readBytes(buf, totalReceivedContentLength, contentLength);
+              totalReceivedContentLength += contentLength;
+            }
+
+            if (msg instanceof LastHttpContent) {
+              if (totalReceivedContentLength == length) {
+                state = FetcherState.FETCH_META_FINISHED;
+
+                List<String> jsonMetas = gson.fromJson(new String(buf), List.class);
+                for (String eachJson : jsonMetas) {
+                  FileChunkMeta meta = gson.fromJson(eachJson, FileChunkMeta.class);
+                  chunkMetas.add(meta);
+                }
+                totalReceivedContentLength = 0;
+                length = -1;
+              } else {
+                endFetch(FetcherState.FETCH_FAILED);
+                throw new IOException("Invalid fetch meta length: " + totalReceivedContentLength + ", expected length: "
+                    + length);
+              }
+            }
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          } finally {
+            ReferenceCountUtil.release(msg);
+          }
+        } else {
+          // http content contains the reason why the fetch failed.
+          LOG.error(content.toString(Charset.defaultCharset()));
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      if (cause instanceof ReadTimeoutException) {
+        LOG.warn(cause.getMessage(), cause);
+      } else {
+        LOG.error("Fetch failed :", cause);
+      }
+
+      // this fetching will be retry
+      finishTime = System.currentTimeMillis();
+      state = TajoProtos.FetcherState.FETCH_FAILED;
+      ctx.close();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      if(getState() == FetcherState.FETCH_INIT || getState() == FetcherState.FETCH_META_FETCHING){
+        //channel is closed, but cannot complete fetcher
+        finishTime = System.currentTimeMillis();
+        LOG.error("Channel closed by peer: " + ctx.channel());
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+      }
+
+      super.channelUnregistered(ctx);
+    }
+  }
+
+  public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
+
+      int maxChunkSize = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+      int readTimeout = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+      pipeline.addLast("handler", new HttpClientHandler());
+    }
+  }
+
+  private List<URI> createChunkMetaRequestURIs(String pullServerAddr, int pullServerPort, PullServerParams params) {
+    final PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder(pullServerAddr, pullServerPort, maxUrlLength);
+    builder.setRequestType(PullServerConstants.META_REQUEST_PARAM_STRING)
+        .setQueryId(params.queryId())
+        .setShuffleType(params.shuffleType())
+        .setEbId(params.ebId())
+        .setPartId(params.partId());
+
+    if (params.contains(Param.OFFSET)) {
+      builder.setOffset(params.offset()).setLength(params.length());
+    }
+
+    if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+      builder.setStartKeyBase64(params.startKey())
+          .setEndKeyBase64(params.endKey())
+          .setLastInclude(params.last());
+    }
+
+    if (params.contains(Param.TASK_ID)) {
+      builder.setTaskAttemptIds(params.taskAttemptIds());
+    }
+    return builder.build(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
new file mode 100644
index 0000000..9a8a70b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NettyUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * RemoteFetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class RemoteFetcher extends AbstractFetcher {
+
+  private final static Log LOG = LogFactory.getLog(RemoteFetcher.class);
+
+  private final String host;
+  private int port;
+
+  private final Bootstrap bootstrap;
+  private final List<Long> chunkLengths = new ArrayList<>();
+
+  public RemoteFetcher(TajoConf conf, URI uri, FileChunk chunk) {
+    super(conf, uri, chunk);
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+
+    bootstrap = new Bootstrap()
+        .group(
+            NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+            conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+        .option(ChannelOption.TCP_NODELAY, true);
+  }
+
+  @Override
+  public List<FileChunk> get() throws IOException {
+    List<FileChunk> fileChunks = new ArrayList<>();
+
+    if (state == FetcherState.FETCH_INIT) {
+      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+      bootstrap.handler(initializer);
+    }
+
+    this.startTime = System.currentTimeMillis();
+    this.state = FetcherState.FETCH_DATA_FETCHING;
+    ChannelFuture future = null;
+    try {
+      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+      // Wait until the connection attempt succeeds or fails.
+      Channel channel = future.awaitUninterruptibly().channel();
+      if (!future.isSuccess()) {
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+        throw new IOException(future.cause());
+      }
+
+      String query = uri.getPath()
+          + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+      // Prepare the HTTP request.
+      HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+      request.headers().set(HttpHeaders.Names.HOST, host);
+      request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Status: " + getState() + ", URI:" + uri);
+      }
+      // Send the HTTP request.
+      channel.writeAndFlush(request);
+
+      // Wait for the server to close the connection. throw exception if failed
+      channel.closeFuture().syncUninterruptibly();
+
+      fileChunk.setLength(fileChunk.getFile().length());
+
+      long start = 0;
+      for (Long eachChunkLength : chunkLengths) {
+        if (eachChunkLength == 0) continue;
+        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+        chunk.setEbId(fileChunk.getEbId());
+        chunk.setFromRemote(true);
+        fileChunks.add(chunk);
+        start += eachChunkLength;
+      }
+      return fileChunks;
+
+    } finally {
+      if(future != null && future.channel().isOpen()){
+        // Close the channel to exit.
+        future.channel().close().awaitUninterruptibly();
+      }
+
+      this.finishTime = System.currentTimeMillis();
+      long elapsedMills = finishTime - startTime;
+      String transferSpeed;
+      if(elapsedMills > 1000) {
+        long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
+        transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
+      } else {
+        transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
+      }
+
+      LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
+          elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
+    }
+  }
+
+  public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+    private final File file;
+    private RandomAccessFile raf;
+    private FileChannel fc;
+    private long length = -1;
+    private int totalReceivedContentLength = 0;
+
+    public HttpClientHandler(File file) throws FileNotFoundException {
+      this.file = file;
+      this.raf = new RandomAccessFile(file, "rw");
+      this.fc = raf.getChannel();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      messageReceiveCount++;
+      if (msg instanceof HttpResponse) {
+        try {
+          HttpResponse response = (HttpResponse) msg;
+
+          StringBuilder sb = new StringBuilder();
+          if (LOG.isDebugEnabled()) {
+            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+                .append(response.getProtocolVersion()).append(", HEADER: ");
+          }
+          if (!response.headers().names().isEmpty()) {
+            for (String name : response.headers().names()) {
+              for (String value : response.headers().getAll(name)) {
+                if (LOG.isDebugEnabled()) {
+                  sb.append(name).append(" = ").append(value);
+                }
+                if (this.length == -1 && name.equals("Content-Length")) {
+                  this.length = Long.parseLong(value);
+                }
+              }
+            }
+            if (response.headers().contains(PullServerConstants.CHUNK_LENGTH_HEADER_NAME)) {
+              String stringOffset = response.headers().get(PullServerConstants.CHUNK_LENGTH_HEADER_NAME);
+
+              for (String eachSplit : stringOffset.split(",")) {
+                chunkLengths.add(Long.parseLong(eachSplit));
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sb.toString());
+          }
+
+          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+            LOG.warn("There are no data corresponding to the request");
+            length = 0;
+            return;
+          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+            LOG.error(response.getStatus().reasonPhrase(), response.getDecoderResult().cause());
+            endFetch(FetcherState.FETCH_FAILED);
+            return;
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+
+      if (msg instanceof HttpContent) {
+        HttpContent httpContent = (HttpContent) msg;
+        ByteBuf content = httpContent.content();
+
+        if (state != FetcherState.FETCH_FAILED) {
+          try {
+            if (content.isReadable()) {
+              totalReceivedContentLength += content.readableBytes();
+              content.readBytes(fc, content.readableBytes());
+            }
+
+            if (msg instanceof LastHttpContent) {
+              if (raf != null) {
+                fileLen = file.length();
+                fileNum = 1;
+              }
+
+              if (totalReceivedContentLength == length) {
+                endFetch(FetcherState.FETCH_DATA_FINISHED);
+              } else {
+                endFetch(FetcherState.FETCH_FAILED);
+                throw new IOException("Invalid fetch length: " + totalReceivedContentLength + ", but expected " + length);
+              }
+              IOUtils.cleanup(LOG, fc, raf);
+            }
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          } finally {
+            ReferenceCountUtil.release(msg);
+          }
+        } else {
+          // http content contains the reason why the fetch failed.
+          LOG.error(content.toString(Charset.defaultCharset()));
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      if (cause instanceof ReadTimeoutException) {
+        LOG.warn(cause.getMessage(), cause);
+      } else {
+        LOG.error("Fetch failed :", cause);
+      }
+
+      // this fetching will be retry
+      IOUtils.cleanup(LOG, fc, raf);
+      endFetch(FetcherState.FETCH_FAILED);
+      ctx.close();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      if(getState() != FetcherState.FETCH_DATA_FINISHED){
+        //channel is closed, but cannot complete fetcher
+        endFetch(FetcherState.FETCH_FAILED);
+        LOG.error("Channel closed by peer: " + ctx.channel());
+      }
+      IOUtils.cleanup(LOG, fc, raf);
+
+      super.channelUnregistered(ctx);
+    }
+  }
+
+  public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+    private final File file;
+
+    public HttpClientChannelInitializer(File file) {
+      this.file = file;
+    }
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
+
+      int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+      int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+      pipeline.addLast("handler", new HttpClientHandler(file));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index de337ae..6296bb0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.metrics.Node;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.pullserver.PullServerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.QueryMaster;
 import org.apache.tajo.querymaster.QueryMasterManagerService;
@@ -64,7 +65,6 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
@@ -144,7 +144,12 @@ public class TajoWorker extends CompositeService {
     queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
     addIfService(queryMasterManagerService);
 
-    this.taskManager = new TaskManager(dispatcher, workerContext);
+    if (!PullServerUtil.useExternalPullServerService(systemConf)) {
+      pullService = new TajoPullServerService();
+      addIfService(pullService);
+    }
+
+    this.taskManager = new TaskManager(dispatcher, workerContext, pullService);
     addService(taskManager);
 
     this.taskExecutor = new TaskExecutor(workerContext);
@@ -158,21 +163,16 @@ public class TajoWorker extends CompositeService {
     addService(new NodeStatusUpdater(workerContext));
 
     int httpPort = 0;
-    if(!TajoPullServerService.isStandalone()) {
-      pullService = new TajoPullServerService();
-      addIfService(pullService);
-    }
-
     if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) {
       httpPort = initWebServer();
     }
 
     super.serviceInit(conf);
 
-    int pullServerPort;
+    int pullServerPort = systemConf.getIntVar(ConfVars.PULLSERVER_PORT);
     if(pullService != null){
       pullServerPort = pullService.getPort();
-    } else {
+    } else if (TajoPullServerService.isStandalone()) {
       pullServerPort = getStandAlonePullServerPort();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 19d5da4..4a5b0b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -54,5 +54,5 @@ public interface Task {
 
   TaskHistory createTaskHistory();
 
-  List<Fetcher> getFetchers();
+  List<AbstractFetcher> getFetchers();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 55eb02a..920dfe5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
@@ -53,7 +54,6 @@ import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.rpc.NullCallback;
@@ -84,7 +84,7 @@ public class TaskImpl implements Task {
   private final Path taskDir;
 
   private final TaskAttemptContext context;
-  private List<Fetcher> fetcherRunners;
+  private List<AbstractFetcher> fetcherRunners;
   private LogicalNode plan;
   private PhysicalExec executor;
 
@@ -269,7 +269,7 @@ public class TaskImpl implements Task {
       return taskIdStr1.compareTo(taskIdStr2);
     });
 
-    for (Fetcher f : fetcherRunners) {
+    for (AbstractFetcher f : fetcherRunners) {
       fetcherExecutor.submit(new FetchRunner(context, f));
     }
   }
@@ -516,14 +516,14 @@ public class TaskImpl implements Task {
         taskHistory.setTotalFetchCount(fetcherRunners.size());
         int i = 0;
         FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
-        for (Fetcher fetcher : fetcherRunners) {
+        for (AbstractFetcher fetcher : fetcherRunners) {
           builder.setStartTime(fetcher.getStartTime());
           builder.setFinishTime(fetcher.getFinishTime());
           builder.setFileLength(fetcher.getFileLen());
           builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
           builder.setState(fetcher.getState());
           taskHistory.addFetcherHistory(builder.build());
-          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+          if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) i++;
         }
         taskHistory.setFinishedFetchCount(i);
       }
@@ -534,7 +534,7 @@ public class TaskImpl implements Task {
     return taskHistory;
   }
 
-  public List<Fetcher> getFetchers() {
+  public List<AbstractFetcher> getFetchers() {
     return fetcherRunners;
   }
 
@@ -585,10 +585,10 @@ public class TaskImpl implements Task {
 
   private class FetchRunner implements Runnable {
     private final TaskAttemptContext ctx;
-    private final Fetcher fetcher;
+    private final AbstractFetcher fetcher;
     private int maxRetryNum;
 
-    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+    public FetchRunner(TaskAttemptContext ctx, AbstractFetcher fetcher) {
       this.ctx = ctx;
       this.fetcher = fetcher;
       this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
@@ -612,7 +612,7 @@ public class TaskImpl implements Task {
           }
           try {
             List<FileChunk> fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+            if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) {
               for (FileChunk eachFetch : fetched) {
                 if (eachFetch.getFile() != null) {
                   if (!eachFetch.fromRemote()) {
@@ -630,7 +630,7 @@ public class TaskImpl implements Task {
           retryNum++;
         }
       } finally {
-        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+        if(fetcher.getState() == FetcherState.FETCH_DATA_FINISHED){
           fetcherFinished(ctx);
         } else {
           if (retryNum == maxRetryNum) {
@@ -669,8 +669,8 @@ public class TaskImpl implements Task {
     }
   }
 
-  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchProto> fetches) throws IOException {
+  private List<AbstractFetcher> getFetchRunners(TaskAttemptContext ctx,
+                                                List<FetchProto> fetches) throws IOException {
 
     if (fetches.size() > 0) {
       Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -681,7 +681,7 @@ public class TaskImpl implements Task {
       File storeDir;
       File defaultStoreFile;
       List<FileChunk> storeChunkList = new ArrayList<>();
-      List<Fetcher> runnerList = Lists.newArrayList();
+      List<AbstractFetcher> runnerList = Lists.newArrayList();
 
       for (FetchProto f : fetches) {
         storeDir = new File(inputDir.toString(), f.getName());
@@ -696,43 +696,16 @@ public class TaskImpl implements Task {
 
           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-
-            List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
-
-            for (FileChunk localChunk : localChunkCandidates) {
-              // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-              // So, we should skip and don't need to create storeChunk.
-              if (localChunk == null || localChunk.length() == 0) {
-                continue;
-              }
-
-              if (localChunk.getFile() != null && localChunk.startOffset() > -1) {
-                localChunk.setFromRemote(false);
-                localStoreChunkCount++;
-              } else {
-                localChunk = new FileChunk(defaultStoreFile, 0, -1);
-                localChunk.setFromRemote(true);
-              }
-              localChunk.setEbId(f.getName());
-              storeChunkList.add(localChunk);
-            }
-
+            localStoreChunkCount++;
+            runnerList.add(new LocalFetcher(systemConf, uri, executionBlockContext, f.getName()));
           } else {
+            // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+            // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
             FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
             remoteChunk.setFromRemote(true);
             remoteChunk.setEbId(f.getName());
-            storeChunkList.add(remoteChunk);
-          }
-
-          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
-          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-          for (FileChunk eachChunk : storeChunkList) {
-            Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
-            runnerList.add(fetcher);
+            runnerList.add(new RemoteFetcher(systemConf, uri, remoteChunk));
             i++;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
-            }
           }
         }
       }
@@ -745,96 +718,9 @@ public class TaskImpl implements Task {
     }
   }
 
-  private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
-    // Parse the URI
-
-    // Parsing the URL into key-values
-    final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
-
-    String partId = params.get("p").get(0);
-    String queryId = params.get("qid").get(0);
-    String shuffleType = params.get("type").get(0);
-    String sid =  params.get("sid").get(0);
-
-    final List<String> taskIdList = params.get("ta");
-    final List<String> offsetList = params.get("offset");
-    final List<String> lengthList = params.get("length");
-
-    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-          + ", taskIds=" + taskIdList);
-    }
-
-    // The working directory of Tajo worker for each query, including stage
-    Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
-
-    List<FileChunk> chunkList = new ArrayList<>();
-    // If the stage requires a range shuffle
-    if (shuffleType.equals("r")) {
-
-      final String startKey = params.get("start").get(0);
-      final String endKey = params.get("end").get(0);
-      final boolean last = params.get("final") != null;
-      final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
-
-      long before = System.currentTimeMillis();
-      for (String eachTaskId : taskIds) {
-        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
-        if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
-          LOG.warn("Range shuffle - file not exist. " + outputPath);
-          continue;
-        }
-        Path path = executionBlockContext.getLocalFS().makeQualified(
-            executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
-
-        try {
-          FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
-          chunkList.add(chunk);
-        } catch (Throwable t) {
-          LOG.error(t.getMessage(), t);
-          throw new IOException(t.getCause());
-        }
-      }
-      long after = System.currentTimeMillis();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Index lookup time: " + (after - before) + " ms");
-      }
-
-      // If the stage requires a hash shuffle or a scattered hash shuffle
-    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-      Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
-
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
-        throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
-      File file = new File(path.toUri());
-      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
-      if (startPos >= file.length()) {
-        throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-      }
-      FileChunk chunk = new FileChunk(file, startPos, readLen);
-      chunkList.add(chunk);
-
-    } else {
-      throw new IOException("Unknown shuffle type");
-    }
-
-    return chunkList;
-  }
-
   public static Path getTaskAttemptDir(TaskAttemptId quid) {
-    Path workDir =
-        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-            String.valueOf(quid.getTaskId().getId()),
-            String.valueOf(quid.getId()));
-    return workDir;
+    return StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+        String.valueOf(quid.getTaskId().getId()),
+        String.valueOf(quid.getId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 61174b3..efdda9a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -30,6 +30,7 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.RpcClientManager;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.tajo.ResourceProtos.*;
@@ -57,12 +59,19 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
   private final Dispatcher dispatcher;
   private TaskExecutor executor;
   private final Properties rpcParams;
+  private final TajoPullServerService pullServerService;
 
-  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){
-    this(dispatcher, workerContext, null);
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+    this(dispatcher, workerContext, null, null);
   }
 
-  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor) {
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext,
+                     TajoPullServerService pullServerService) {
+    this(dispatcher, workerContext, null, pullServerService);
+  }
+
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor,
+                     TajoPullServerService pullServerService) {
     super(TaskManager.class.getName());
 
     this.dispatcher = dispatcher;
@@ -70,6 +79,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
     this.executionBlockContextMap = Maps.newHashMap();
     this.executor = executor;
     this.rpcParams = RpcParameterFactory.get(this.workerContext.getConf());
+    this.pullServerService = pullServerService;
   }
 
   @Override
@@ -124,7 +134,8 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
       stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
 
       ExecutionBlockContextResponse contextProto = callback.get();
-      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client);
+      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client,
+          pullServerService);
 
       context.init();
       return context;

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index 88b7c24..3eab5cd 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.Fetcher" %>
+<%@ page import="org.apache.tajo.worker.AbstractFetcher" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="org.apache.tajo.worker.Task" %>
 <%@ page import="org.apache.tajo.worker.TaskHistory" %>
@@ -161,7 +161,7 @@
             <th>URI</th>
         </tr>
         <%
-            for (Fetcher eachFetcher : task.getFetchers()) {
+            for (AbstractFetcher eachFetcher : task.getFetchers()) {
         %>
         <tr>
             <td><%=index%>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index 652ab84..7280e1f 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -160,6 +160,9 @@
                       run mkdir -p share/jdbc-dist
                       run cp -r $ROOT/tajo-jdbc/target/tajo-jdbc-${project.version}-jar-with-dependencies.jar ./share/jdbc-dist/tajo-jdbc-${project.version}.jar
 
+                      run mkdir -p share/yarn-dist
+                      run cp -r $ROOT/tajo-yarn/target/tajo-yarn-${project.version}-jar-with-dependencies.jar ./share/yarn-dist/tajo-yarn-${project.version}.jar
+
                       run mkdir -p extlib
 
                       echo

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration.rst b/tajo-docs/src/main/sphinx/configuration.rst
index 8b0b8a2..bef49cb 100644
--- a/tajo-docs/src/main/sphinx/configuration.rst
+++ b/tajo-docs/src/main/sphinx/configuration.rst
@@ -10,6 +10,7 @@ Configuration
     configuration/tajo_master_configuration
     configuration/worker_configuration
     configuration/catalog_configuration
+    configuration/pullserver_configuration
     configuration/ha_configuration
     configuration/service_config_defaults
     configuration/tajo-site-xml

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
index e9d17eb..3835512 100644
--- a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
+++ b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
@@ -1,9 +1,9 @@
-*******************************************
+*************
 Cluster Setup
-*******************************************
+*************
 
 Fully Distributed Mode
-==========================================
+======================
 A fully distributed mode enables a Tajo instance to run on `Hadoop Distributed File System (HDFS) <http://wiki.apache.org/hadoop/HDFS>`_. In this mode, a number of Tajo workers run across a number of the physical nodes where HDFS data nodes run.
 
 
@@ -11,7 +11,7 @@ In this section, we explain how to setup the cluster mode.
 
 
 Settings
---------------------------------------------------------
+--------
 
 Please add the following configs to tajo-site.xml file:
 
@@ -43,7 +43,7 @@ Please add the following configs to tajo-site.xml file:
   </property>
 
 Workers
---------------------------------------------------------
+-------
 
 The file ``conf/workers`` lists all host names of workers, one per line.
 By default, this file contains the single entry ``localhost``.
@@ -59,7 +59,7 @@ For example: ::
   <ctrl + d>
 
 Make base directories and set permissions
---------------------------------------------------------
+-----------------------------------------
 
 If you want to know Tajo’s configuration in more detail, see Configuration page.
 Before launching the tajo, you should create the tajo root dir and set the permission as follows: ::
@@ -69,7 +69,7 @@ Before launching the tajo, you should create the tajo root dir and set the permi
 
 
 Launch a Tajo cluster
---------------------------------------------------------
+---------------------
 
 Then, execute ``start-tajo.sh`` ::
 
@@ -77,10 +77,10 @@ Then, execute ``start-tajo.sh`` ::
 
 .. note::
 
-  In default, each worker is set to very little resource capacity. In order to increase parallel degree, please read 
+  By default, each worker is set to very little resource capacity. In order to increase parallel degree, please read
   :doc:`/configuration/worker_configuration`.
 
 .. note::
 
-  In default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
+  By default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
new file mode 100644
index 0000000..68760fc
--- /dev/null
+++ b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
@@ -0,0 +1,75 @@
+*************************
+Pull Server Configuration
+*************************
+
+Pull servers are responsible for transmitting data among Tajo workers during shuffle phases. Tajo provides several modes
+for pull servers.
+
+Internal Mode (Default)
+=======================
+
+With the internal mode, each worker acts as a pull server. So, they need to transmit data during shuffle phase as well
+as processing them during processing phase.
+
+Standalone Mode
+===============
+
+Sometimes, data shuffling requires huge memory space and a lot of cpu processing.
+This can make query processing slow because Tajo's query engine should contend for limited resources with pull servers.
+Tajo provides the standalone mode to avoid this unnecessary contention.
+
+In this mode, each pull server is executed as a separate process. To enable this mode, you need to add the following
+line to ``${TAJO_HOME}/conf/tajo-env.sh``.
+
+.. code-block:: sh
+
+  export TAJO_PULLSERVER_STANDALONE=true
+
+Then, you can see the following messages when you start up the tajo cluster.
+
+.. code-block:: sh
+
+  Starting single TajoMaster
+  starting master, logging to ...
+  192.168.10.1: starting pullserver, logging to ...
+  192.168.10.1: starting worker, logging to ...
+  192.168.10.2: starting pullserver, logging to ...
+  192.168.10.2: starting worker, logging to ...
+  ...
+
+.. warning::
+
+  Currently, only one single server should be run in each machine.
+
+Yarn Auxiliary Service Mode
+===========================
+
+You can run pull servers as one of Yarn's auxiliary services. To do so, you need to add the following configurations
+to ``${HADOOP_CONF}/yarn-site.xml``.
+
+.. code-block:: xml
+
+  <property>
+    <name>yarn.nodemanager.aux-services</name>
+    <value>mapreduce_shuffle,tajo_shuffle</value>
+  </property>
+
+  <property>
+    <name>yarn.nodemanager.aux-services.tajo_shuffle.class</name>
+    <value>org.apache.tajo.yarn.TajoPullServerService</value>
+  </property>
+
+  <property>
+    <name>tajo.pullserver.port</name>
+    <value>port number</value>
+  </property>
+
+Optionally, you can add the below configuration to specify temp directories. For this configuration,
+please refer to :doc:`/configuration/worker_configuration`.
+
+.. code-block:: xml
+
+  <property>
+    <name>tajo.worker.tmpdir.locations</name>
+    <value>/path/to/tajo/temporal/directory</value>
+  </property>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index ee1317f..4818983 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -832,6 +832,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-yarn</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
         <artifactId>tajo-client</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -991,13 +996,6 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-server-tests</artifactId>
-        <version>${hadoop.version}</version>
-        <type>test-jar</type>
-        <scope>test</scope>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
         <version>${hadoop.version}</version>
         <exclusions>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 2a1a745..6c805ac 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -56,12 +56,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc-protobuf</artifactId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-catalog-common</artifactId>
-      <scope>provided</scope>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
index 643d9e0..07f0e56 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -151,6 +151,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
   protected void deallocate() {
     if (readaheadRequest != null) {
       readaheadRequest.cancel();
+      readaheadRequest = null;
     }
     super.deallocate();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
index 9c3c523..7f97542 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -24,18 +24,9 @@ import io.netty.util.concurrent.GenericFutureListener;
 public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
 
   private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
 
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
+  public FileCloseListener(FadvisedFileRegion filePart) {
     this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
   }
 
   // TODO error handling; distinguish IO/connection failures,
@@ -46,8 +37,5 @@ public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
       filePart.transferSuccessful();
     }
     filePart.deallocate();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
new file mode 100644
index 0000000..74f96e7
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+public class PullServerConstants {
+
+  /**
+   * Pull server query parameters
+   */
+  public enum Param {
+    // Common params
+    REQUEST_TYPE("rtype"),  // can be one of 'm' for meta and 'c' for chunk.
+    SHUFFLE_TYPE("stype"),  // can be one of 'r', 'h', and 's'.
+    QUERY_ID("qid"),
+    EB_ID("sid"),
+    PART_ID("p"),
+    TASK_ID("ta"),
+    OFFSET("offset"),
+    LENGTH("length"),
+
+    // Range shuffle params
+    START("start"),
+    END("end"),
+    FINAL("final");
+
+    private String key;
+
+    Param(String key) {
+      this.key = key;
+    }
+
+    public String key() {
+      return key;
+    }
+  }
+
+  // Request types ----------------------------------------------------------
+
+  public static final String CHUNK_REQUEST_PARAM_STRING = "c";
+  public static final String META_REQUEST_PARAM_STRING = "m";
+
+  // Shuffle types ----------------------------------------------------------
+
+  public static final String RANGE_SHUFFLE_PARAM_STRING = "r";
+  public static final String HASH_SHUFFLE_PARAM_STRING = "h";
+  public static final String SCATTERED_HASH_SHUFFLE_PARAM_STRING = "s";
+
+  // HTTP header ------------------------------------------------------------
+
+  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+  // SSL configurations -----------------------------------------------------
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+      "tajo.pullserver.ssl.file.buffer.size";
+
+  // OS cache configurations ------------------------------------------------
+
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  // Prefetch configurations ------------------------------------------------
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+  // Yarn service ID --------------------------------------------------------
+
+  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+  // Standalone pull server -------------------------------------------------
+  public static final String PULLSERVER_STANDALONE_ENV_KEY = "TAJO_PULLSERVER_STANDALONE";
+
+  public static final String PULLSERVER_SERVICE_NAME = "httpshuffle";
+}


[4/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

Posted by ji...@apache.org.
TAJO-2122: PullServer as an Auxiliary service of Yarn.

Closes #1001


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73ac4b87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73ac4b87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73ac4b87

Branch: refs/heads/master
Commit: 73ac4b87d0f7389b79be8a847e4215fc59befaff
Parents: 71193b2
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Apr 27 13:26:28 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Apr 27 13:26:28 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 pom.xml                                         |   3 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../org/apache/tajo/exception/ErrorUtil.java    |  14 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   8 +-
 tajo-core-tests/pom.xml                         |   4 +
 .../physical/TestProgressExternalSortExec.java  |   2 -
 .../apache/tajo/master/TestRepartitioner.java   |  10 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |   3 +-
 .../apache/tajo/worker/MockExecutionBlock.java  |  42 --
 .../tajo/worker/MockExecutionBlockContext.java  |  42 ++
 .../apache/tajo/worker/MockTaskExecutor.java    |   2 +-
 .../org/apache/tajo/worker/MockTaskManager.java |   3 +-
 .../org/apache/tajo/worker/TestFetcher.java     | 236 -------
 .../worker/TestFetcherWithTajoPullServer.java   | 437 ++++++++++++
 .../apache/tajo/worker/TestTaskExecutor.java    |   2 +-
 .../apache/tajo/querymaster/Repartitioner.java  |  96 +--
 .../org/apache/tajo/worker/AbstractFetcher.java |  89 +++
 .../tajo/worker/ExecutionBlockContext.java      |  10 +-
 .../worker/ExecutionBlockSharedResource.java    |  16 +
 .../java/org/apache/tajo/worker/Fetcher.java    | 356 ----------
 .../org/apache/tajo/worker/LocalFetcher.java    | 480 +++++++++++++
 .../org/apache/tajo/worker/RemoteFetcher.java   | 317 +++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  18 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 156 +----
 .../org/apache/tajo/worker/TaskManager.java     |  19 +-
 .../resources/webapps/worker/taskdetail.jsp     |   4 +-
 tajo-dist/pom.xml                               |   3 +
 tajo-docs/src/main/sphinx/configuration.rst     |   1 +
 .../main/sphinx/configuration/cluster_setup.rst |  18 +-
 .../configuration/pullserver_configuration.rst  |  75 ++
 tajo-project/pom.xml                            |  12 +-
 tajo-pullserver/pom.xml                         |   6 +-
 .../tajo/pullserver/FadvisedFileRegion.java     |   1 +
 .../tajo/pullserver/FileCloseListener.java      |  14 +-
 .../tajo/pullserver/PullServerConstants.java    |  93 +++
 .../apache/tajo/pullserver/PullServerUtil.java  | 688 ++++++++++++++++++-
 .../apache/tajo/pullserver/TajoPullServer.java  |   5 -
 .../tajo/pullserver/TajoPullServerService.java  | 683 +++++-------------
 .../tajo/pullserver/retriever/FileChunk.java    |   4 +-
 .../pullserver/retriever/FileChunkMeta.java     |  53 ++
 .../pullserver/retriever/IndexCacheKey.java     |  63 ++
 tajo-yarn/pom.xml                               | 265 +++++++
 .../apache/tajo/yarn/FadvisedChunkedFile.java   |  82 +++
 .../apache/tajo/yarn/FadvisedFileRegion.java    | 173 +++++
 .../org/apache/tajo/yarn/FileCloseListener.java |  41 ++
 .../apache/tajo/yarn/TajoPullServerService.java | 608 ++++++++++++++++
 49 files changed, 3794 insertions(+), 1470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 38aedda..b729cce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
+
     TAJO-2109: Implement Radix sort. (jihoon)
 
     TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d255652..71c062b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,12 +92,13 @@
     <module>tajo-sql-parser</module>
     <module>tajo-storage</module>
     <module>tajo-pullserver</module>
-    <module>tajo-dist</module>
+    <module>tajo-yarn</module>
     <module>tajo-thirdparty/asm</module>
     <module>tajo-cli</module>
     <module>tajo-metrics</module>
     <module>tajo-core-tests</module>
     <module>tajo-cluster-tests</module>
+    <module>tajo-dist</module>
   </modules>
 
   <build>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 4e7d236..b1a3306 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -162,7 +162,7 @@ public class TajoTestingCluster {
     conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
     conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
 
-    /** decrease Hbase thread and memory cache for testing */
+    /* decrease Hbase thread and memory cache for testing */
     //server handler
     conf.setInt("hbase.regionserver.handler.count", 5);
     //client handler

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2e2fb18..440af80 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -214,6 +214,7 @@ public class TajoConf extends Configuration {
     PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
     PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
         Validators.min("1")),
+    YARN_SHUFFLE_SERVICE_ENABLED("tajo.shuffle.yarn-service.enabled", false, Validators.bool()),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
index 9a71bd6..957b3d1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
@@ -34,12 +34,14 @@ public class ErrorUtil {
 
   public static Stacktrace.StackTrace convertStacktrace(Throwable t) {
     Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder();
-    for (StackTraceElement element : t.getStackTrace()) {
-      builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
-              .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
-              .setFunction(element.getClassName() + "::" + element.getMethodName())
-              .setLine(element.getLineNumber())
-      );
+    if (t != null) {
+      for (StackTraceElement element : t.getStackTrace()) {
+        builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
+            .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
+            .setFunction(element.getClassName() + "::" + element.getMethodName())
+            .setLine(element.getLineNumber())
+        );
+      }
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 1795107..78d48c4 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -49,9 +49,11 @@ enum TaskAttemptState {
 
 enum FetcherState {
   FETCH_INIT = 0;
-  FETCH_FETCHING = 1;
-  FETCH_FINISHED = 2;
-  FETCH_FAILED = 3;
+  FETCH_META_FETCHING = 1;
+  FETCH_META_FINISHED = 2;
+  FETCH_DATA_FETCHING = 3;
+  FETCH_DATA_FINISHED = 4;
+  FETCH_FAILED = 5;
 }
 
 message WorkerConnectionInfoProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index b12642a..3554df4 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -174,6 +174,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-yarn</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index eeb179f..51cd5ea 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -173,13 +173,11 @@ public class TestProgressExternalSortExec {
     while ((tuple = exec.next()) != null) {
       if (cnt == 0) {
         initProgress = exec.getProgress();
-        System.out.println(initProgress);
         assertTrue(initProgress > 0.5f && initProgress < 1.0f);
       }
 
       if (cnt == testDataStats.getNumRows() / 2) {
         float progress = exec.getProgress();
-        System.out.println(progress);
         assertTrue(progress > initProgress);
       }
       curVal = tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a13a750..abec6a0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -20,12 +20,13 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.TestTajoIds;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
@@ -88,12 +89,11 @@ public class TestRepartitioner {
       assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only one URI per partition.
 
       URI uri = uris.get(0);
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(uri).parameters();
+      final PullServerParams params = new PullServerParams(uri);
 
       assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
-      assertEquals("h", params.get("type").get(0));
-      assertEquals("" + sid.getId(), params.get("sid").get(0));
+      assertEquals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING, params.shuffleType());
+      assertEquals("" + sid.getId(), params.ebId());
     }
 
     Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index ac5efd9..8d33dbc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -26,7 +26,6 @@ import org.apache.tajo.*;
 import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.parser.sql.SQLAnalyzer;
@@ -269,7 +268,7 @@ public class TestKillQuery {
       }
     };
 
-    ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
+    ExecutionBlockContext context = new MockExecutionBlockContext(workerContext, requestProtoBuilder.build()) {
       @Override
       public Path createBaseDir() throws IOException {
         return new Path("test");

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
deleted file mode 100644
index cbc4312..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
-import org.apache.tajo.TaskAttemptId;
-
-import java.io.IOException;
-
-public class MockExecutionBlock extends ExecutionBlockContext {
-
-  public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
-                            ExecutionBlockContextResponse request) throws IOException {
-    super(workerContext, request, null);
-  }
-
-  @Override
-  public void init() throws Throwable {
-    //skip
-  }
-
-  @Override
-  public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
new file mode 100644
index 0000000..b64ab9b
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlockContext extends ExecutionBlockContext {
+
+  public MockExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+                                   ExecutionBlockContextResponse request) throws IOException {
+    super(workerContext, request, null, null);
+  }
+
+  @Override
+  public void init() throws Throwable {
+    //skip
+  }
+
+  @Override
+  public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index 071d26a..ea609b1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -145,7 +145,7 @@ public class MockTaskExecutor extends TaskExecutor {
       }
 
       @Override
-      public List<Fetcher> getFetchers() {
+      public List<AbstractFetcher> getFetchers() {
         return null;
       }
     };

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 5979bbb..0e114bb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -29,7 +29,6 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
 
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
 
 public class MockTaskManager extends TaskManager {
 
@@ -61,7 +60,7 @@ public class MockTaskManager extends TaskManager {
           .setQueryContext(new QueryContext(new TajoConf()).getProto())
           .setQueryOutputPath("testpath")
           .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-      return new MockExecutionBlock(getWorkerContext(), builder.build());
+      return new MockExecutionBlockContext(getWorkerContext(), builder.build());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
deleted file mode 100644
index dfc37b0..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class TestFetcher {
-  private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
-  private String INPUT_DIR = TEST_DATA+"/in/";
-  private String OUTPUT_DIR = TEST_DATA+"/out/";
-  private TajoConf conf = new TajoConf();
-  private TajoPullServerService pullServerService;
-
-  @Before
-  public void setUp() throws Exception {
-    CommonTestingUtil.getTestDir(TEST_DATA);
-    CommonTestingUtil.getTestDir(INPUT_DIR);
-    CommonTestingUtil.getTestDir(OUTPUT_DIR);
-    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
-    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
-    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
-
-    pullServerService = new TajoPullServerService();
-    pullServerService.init(conf);
-    pullServerService.start();
-  }
-
-  @After
-  public void tearDown(){
-    pullServerService.stop();
-  }
-
-  @Test
-  public void testGet() throws IOException {
-    Random rnd = new Random();
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String partId = "1";
-
-    int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-    String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
-       queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
-
-    Path inputPath = new Path(dataPath);
-    FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
-    for (int i = 0; i < 100; i++) {
-      String data = ""+rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    FileChunk chunk = fetcher.get().get(0);
-    assertNotNull(chunk);
-    assertNotNull(chunk.getFile());
-
-    FileSystem fs = FileSystem.getLocal(new TajoConf());
-    FileStatus inStatus = fs.getFileStatus(inputPath);
-    FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
-
-    assertEquals(inStatus.getLen(), outStatus.getLen());
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testAdjustFetchProcess() {
-    assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
-    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
-    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
-    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
-    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
-    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
-    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
-  }
-
-  @Test
-  public void testStatus() throws Exception {
-    Random rnd = new Random();
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
-    for (int i = 0; i < 100; i++) {
-      String data = ""+rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testNoContentFetch() throws Exception {
-
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    Path inputPath = new Path(dataPath);
-    FileSystem fs = FileSystem.getLocal(conf);
-    if(fs.exists(inputPath)){
-      fs.delete(new Path(dataPath), true);
-    }
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testFailureStatus() throws Exception {
-    Random rnd = new Random();
-
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-
-    //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
-    String shuffleType = "x";
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
-
-    for (int i = 0; i < 100; i++) {
-      String data = params + rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
-  }
-
-  @Test
-  public void testServerFailure() throws Exception {
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    pullServerService.stop();
-
-    boolean failure = false;
-    try{
-      fetcher.get();
-    } catch (Throwable e){
-      failure = true;
-    }
-    assertTrue(failure);
-    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
new file mode 100644
index 0000000..8844fce
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestFetcherWithTajoPullServer {
+  private enum FetchType {
+    LOCAL,
+    REMOTE
+  }
+  private enum PullServerType {
+    TAJO,
+    YARN
+  }
+
+  private final String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/" +
+      TestFetcherWithTajoPullServer.class.getSimpleName();
+  private final String INPUT_DIR = TEST_DATA+"/in/";
+  private final String OUTPUT_DIR = TEST_DATA+"/out/";
+  private final TajoConf conf = new TajoConf();
+  private Service pullServerService;
+  private final int maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+  private final String TEST_TABLE_NAME = "test";
+  private final FetchType fetchType;
+  private final PullServerType pullServerType;
+  private int pullserverPort;
+
+  public TestFetcherWithTajoPullServer(FetchType fetchType, PullServerType pullServerType) {
+    this.fetchType = fetchType;
+    this.pullServerType = pullServerType;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    CommonTestingUtil.getTestDir(TEST_DATA);
+    CommonTestingUtil.getTestDir(INPUT_DIR);
+    CommonTestingUtil.getTestDir(OUTPUT_DIR);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+    if (pullServerType.equals(PullServerType.TAJO)) {
+      pullServerService = new TajoPullServerService();
+    } else {
+      pullServerService = new org.apache.tajo.yarn.TajoPullServerService();
+    }
+    pullServerService.init(conf);
+    pullServerService.start();
+
+    if (pullServerType.equals(PullServerType.TAJO)) {
+      pullserverPort = ((TajoPullServerService)pullServerService).getPort();
+    } else {
+      pullserverPort = ((org.apache.tajo.yarn.TajoPullServerService)pullServerService).getPort();
+    }
+  }
+
+  @After
+  public void tearDown() {
+    pullServerService.stop();
+  }
+
+  @Parameters(name = "{index}: {0}, {1}")
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {FetchType.LOCAL, PullServerType.TAJO},
+        {FetchType.REMOTE, PullServerType.TAJO},
+        {FetchType.LOCAL, PullServerType.YARN},
+        {FetchType.REMOTE, PullServerType.YARN}
+    });
+  }
+
+  private AbstractFetcher getFetcher(URI uri, File data) throws IOException {
+    if (fetchType.equals(FetchType.LOCAL)) {
+      return new LocalFetcher(conf, uri, TEST_TABLE_NAME);
+    } else {
+      FileChunk storeChunk = new FileChunk(data, 0, data.length());
+      storeChunk.setFromRemote(true);
+      return new RemoteFetcher(conf, uri, storeChunk);
+    }
+  }
+
+  @Test
+  public void testGetHashShuffle() throws IOException {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(false).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+
+    FileChunk chunk = fetcher.get().get(0);
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
+
+    FileSystem fs = FileSystem.getLocal(new TajoConf());
+    FileStatus inStatus = fs.getFileStatus(inputPath);
+    FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+    assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testGetRangeShuffle() throws IOException {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String partId = "1";
+    String taskId = "1";
+    String attemptId = "0";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    Path outDir = StorageUtil.concatPath(queryBaseDir, taskId + "_" + attemptId, "output");
+    Path dataPath = StorageUtil.concatPath(outDir, "output");
+    Path indexPath = StorageUtil.concatPath(outDir, "index");
+
+    List<String> strings = new ArrayList<>(100);
+    for (int i = 0; i < 100; i++) {
+      strings.add("" + rnd.nextInt());
+    }
+    Collections.sort(strings);
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    final FSDataOutputStream stream = fs.create(inputPath, true);
+    BSTIndex index = new BSTIndex(conf);
+    Schema schema = SchemaBuilder.builder().addAll(new Column[] {new Column("rnd", Type.TEXT)}).build();
+    SortSpec[] sortSpecs = new SortSpec[] {new SortSpec(schema.getColumn(0))};
+    BSTIndexWriter writer = index.getIndexWriter(new Path(INPUT_DIR, indexPath), BSTIndex.TWO_LEVEL_INDEX, schema, new BaseTupleComparator(schema, sortSpecs), true);
+    writer.init();
+
+    for (String t : strings) {
+      writer.write(new VTuple(new Datum[] {DatumFactory.createText(t)}), stream.getPos());
+      stream.write(t.getBytes());
+    }
+    stream.flush();
+    writer.flush();
+    stream.close();
+    writer.close();
+
+    RangeParam rangeParam = new RangeParam(new TupleRange(sortSpecs,
+        new VTuple(new Datum[] {DatumFactory.createText(strings.get(0))}),
+        new VTuple(new Datum[] {DatumFactory.createText(strings.get(strings.size() - 1))})), true, RowStoreUtil.createEncoder(schema));
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING)
+        .setTaskIds(Lists.newArrayList(Integer.parseInt(taskId)))
+        .setAttemptIds(Lists.newArrayList(Integer.parseInt(attemptId)))
+        .setStartKeyBase64(new String(Base64.encodeBase64(rangeParam.getStart())))
+        .setEndKeyBase64(new String(Base64.encodeBase64(rangeParam.getEnd())))
+        .setLastInclude(true);
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+
+    FileChunk chunk = fetcher.get().get(0);
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
+
+    FileStatus inStatus = fs.getFileStatus(inputPath);
+    FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+    assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testAdjustFetchProcess() {
+    Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+  }
+
+  @Test
+  public void testStatus() throws Exception {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testNoContentFetch() throws Exception {
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FileSystem fs = FileSystem.getLocal(conf);
+    if(fs.exists(inputPath)){
+      fs.delete(inputPath, true);
+    }
+
+    FSDataOutputStream stream =  fs.create(inputPath, true);
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    try {
+      fetcher.get();
+      if (fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    } catch (IOException e) {
+      if (fetchType.equals(FetchType.REMOTE)) {
+        fail();
+      }
+    }
+    assertEquals(FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+
+  @Test
+  public void testFailureStatus() throws Exception {
+    Random rnd = new Random();
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType("x") //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+
+    for (int i = 0; i < 100; i++) {
+      String data = "" + rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    try {
+      fetcher.get();
+      if (fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    } catch (IllegalArgumentException e) {
+      if (!fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    }
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+
+  @Test
+  public void testServerFailure() throws Exception {
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    pullServerService.stop();
+
+    boolean failure = false;
+    try{
+      fetcher.get();
+    } catch (IOException e){
+      failure = true;
+    }
+    assertTrue(failure);
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 45e430e..df5b3c8 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -305,7 +305,7 @@ public class TestTaskExecutor {
         }
 
         @Override
-        public List<Fetcher> getFetchers() {
+        public List<AbstractFetcher> getFetchers() {
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2a688e5..ba051a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,6 +50,8 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.storage.*;
@@ -70,7 +72,6 @@ import java.net.URLEncoder;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -1124,89 +1125,32 @@ public class Repartitioner {
   }
 
   public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
-    String scheme = "http://";
-
-    StringBuilder urlPrefix = new StringBuilder(scheme);
+    PullServerRequestURIBuilder builder =
+        new PullServerRequestURIBuilder(fetch.getHost(), fetch.getPort(), maxUrlLength);
     ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
-    urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
-        .append("qid=").append(ebId.getQueryId().toString())
-        .append("&sid=").append(ebId.getId())
-        .append("&p=").append(fetch.getPartitionId())
-        .append("&type=");
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(ebId.getQueryId().toString())
+        .setEbId(ebId.getId())
+        .setPartId(fetch.getPartitionId());
+
     if (fetch.getType() == HASH_SHUFFLE) {
-      urlPrefix.append("h");
+      builder.setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
     } else if (fetch.getType() == RANGE_SHUFFLE) {
-      urlPrefix.append("r").append("&").append(getRangeParam(fetch));
+      builder.setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+      builder.setStartKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeStart().toByteArray())));
+      builder.setEndKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeEnd().toByteArray())));
+      builder.setLastInclude(fetch.getRangeLastInclusive());
     } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
-      urlPrefix.append("s");
+      builder.setShuffleType(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
     }
-
     if (fetch.getLength() >= 0) {
-      urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
+      builder.setOffset(fetch.getOffset()).setLength(fetch.getLength());
     }
-
-    List<URI> fetchURLs = new ArrayList<>();
-    if(includeParts) {
-      if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
-        fetchURLs.add(URI.create(urlPrefix.toString()));
-      } else {
-        urlPrefix.append("&ta=");
-        // If the get request is longer than 2000 characters,
-        // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
-        // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
-        // The below code transforms a long request to multiple requests.
-        List<String> taskIdsParams = new ArrayList<>();
-        StringBuilder taskIdListBuilder = new StringBuilder();
-        
-        final List<Integer> taskIds = fetch.getTaskIdList();
-        final List<Integer> attemptIds = fetch.getAttemptIdList();
-
-        // Sort task ids to increase cache hit in pull server
-        final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
-            .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
-            .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
-            .collect(Collectors.toList());
-
-        boolean first = true;
-
-        for (int i = 0; i < taskAndAttemptIds.size(); i++) {
-          StringBuilder taskAttemptId = new StringBuilder();
-
-          if (!first) { // when comma is added?
-            taskAttemptId.append(",");
-          } else {
-            first = false;
-          }
-
-          int taskId = taskAndAttemptIds.get(i).getFirst();
-          if (taskId < 0) {
-            // In the case of hash shuffle each partition has single shuffle file per worker.
-            // TODO If file is large, consider multiple fetching(shuffle file can be split)
-            continue;
-          }
-          int attemptId = taskAndAttemptIds.get(i).getSecond();
-          taskAttemptId.append(taskId).append("_").append(attemptId);
-
-          if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
-            taskIdsParams.add(taskIdListBuilder.toString());
-            taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
-          } else {
-            taskIdListBuilder.append(taskAttemptId);
-          }
-        }
-        // if the url params remain
-        if (taskIdListBuilder.length() > 0) {
-          taskIdsParams.add(taskIdListBuilder.toString());
-        }
-        for (String param : taskIdsParams) {
-          fetchURLs.add(URI.create(urlPrefix + param));
-        }
-      }
-    } else {
-      fetchURLs.add(URI.create(urlPrefix.toString()));
+    if (includeParts) {
+      builder.setTaskIds(fetch.getTaskIdList());
+      builder.setAttemptIds(fetch.getAttemptIdList());
     }
-
-    return fetchURLs;
+    return builder.build(includeParts);
   }
 
   public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
new file mode 100644
index 0000000..a12db77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+public abstract class AbstractFetcher {
+
+  protected final URI uri;
+  protected FileChunk fileChunk;
+  protected final TajoConf conf;
+
+  protected TajoProtos.FetcherState state;
+
+  protected long startTime;
+  protected volatile long finishTime;
+  protected int fileNum;
+  protected long fileLen;
+  protected int messageReceiveCount;
+
+  public AbstractFetcher(TajoConf conf, URI uri) {
+    this(conf, uri, null);
+  }
+
+  public AbstractFetcher(TajoConf conf, URI uri, FileChunk fileChunk) {
+    this.conf = conf;
+    this.uri = uri;
+    this.fileChunk = fileChunk;
+    this.state = TajoProtos.FetcherState.FETCH_INIT;
+  }
+
+  public URI getURI() {
+    return this.uri;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public long getFileLen() {
+    return fileLen;
+  }
+
+  public int getFileNum() {
+    return fileNum;
+  }
+
+  public TajoProtos.FetcherState getState() {
+    return state;
+  }
+
+  public int getMessageReceiveCount() {
+    return messageReceiveCount;
+  }
+
+  public abstract List<FileChunk> get() throws IOException;
+
+  protected void endFetch(FetcherState state) {
+    this.finishTime = System.currentTimeMillis();
+    this.state = state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index e675d70..4ab6627 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.ErrorUtil;
@@ -44,6 +45,7 @@ import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.PullServerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -103,7 +105,7 @@ public class ExecutionBlockContext {
   private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap();
 
   public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request,
-                               AsyncRpcClient queryMasterClient)
+                               AsyncRpcClient queryMasterClient, @Nullable TajoPullServerService pullServerService)
       throws IOException {
     this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
     this.connManager = RpcClientManager.getInstance();
@@ -117,7 +119,7 @@ public class ExecutionBlockContext {
     this.queryEngine = new TajoQueryEngine(systemConf);
     this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
     this.plan = request.getPlanJson();
-    this.resource = new ExecutionBlockSharedResource();
+    this.resource = new ExecutionBlockSharedResource(pullServerService);
     this.workerContext = workerContext;
     this.shuffleType = request.getShuffleType();
     this.queryMasterClient = queryMasterClient;
@@ -281,12 +283,12 @@ public class ExecutionBlockContext {
   }
 
   public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
-    return TajoPullServerService.getBaseOutputDir(
+    return PullServerUtil.getBaseOutputDir(
         executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
   }
 
   public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
-    return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
+    return PullServerUtil.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
   }
 
   public ExecutionBlockId getExecutionBlockId() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index 660f875..e1ff917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
 import org.apache.tajo.engine.codegen.TajoClassLoader;
@@ -35,8 +36,10 @@ import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.util.Pair;
 
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ExecutionBlockSharedResource {
@@ -53,6 +56,15 @@ public class ExecutionBlockSharedResource {
   private ExecutorPreCompiler.CompilationContext compilationContext;
   private LogicalNode plan;
   private boolean codeGenEnabled = false;
+  private final TajoPullServerService pullServerService;
+
+  public ExecutionBlockSharedResource() {
+    this(null);
+  }
+
+  public ExecutionBlockSharedResource(@Nullable TajoPullServerService pullServerService) {
+    this.pullServerService = pullServerService;
+  }
 
   public void initialize(final QueryContext context, final String planJson) {
 
@@ -130,6 +142,10 @@ public class ExecutionBlockSharedResource {
     TableCache.getInstance().releaseCache(id);
   }
 
+  public Optional<TajoPullServerService> getPullServerService() {
+    return pullServerService == null ? Optional.empty() : Optional.of(pullServerService);
+  }
+
   public void release() {
     compilationContext = null;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
deleted file mode 100644
index 250b4cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.*;
-import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.FetcherState;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NettyUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Fetcher fetches data from a given uri via HTTP protocol and stores them into
- * a specific file. It aims at asynchronous and efficient data transmit.
- */
-public class Fetcher {
-
-  private final static Log LOG = LogFactory.getLog(Fetcher.class);
-
-  private final URI uri;
-  private final FileChunk fileChunk;
-  private final TajoConf conf;
-
-  private final String host;
-  private int port;
-  private final boolean useLocalFile;
-
-  private long startTime;
-  private volatile long finishTime;
-  private long fileLen;
-  private int messageReceiveCount;
-  private TajoProtos.FetcherState state;
-
-  private Bootstrap bootstrap;
-  private List<Long> chunkLengths = new ArrayList<>();
-
-  public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
-    this.uri = uri;
-    this.fileChunk = chunk;
-    this.useLocalFile = !chunk.fromRemote();
-    this.state = TajoProtos.FetcherState.FETCH_INIT;
-    this.conf = conf;
-
-    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
-    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
-    this.port = uri.getPort();
-    if (port == -1) {
-      if (scheme.equalsIgnoreCase("http")) {
-        this.port = 80;
-      } else if (scheme.equalsIgnoreCase("https")) {
-        this.port = 443;
-      }
-    }
-
-    if (!useLocalFile) {
-      bootstrap = new Bootstrap()
-          .group(
-              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
-                  conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
-          .channel(NioSocketChannel.class)
-          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
-          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
-              conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
-          .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
-          .option(ChannelOption.TCP_NODELAY, true);
-    }
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public long getFileLen() {
-    return fileLen;
-  }
-
-  public TajoProtos.FetcherState getState() {
-    return state;
-  }
-
-  public int getMessageReceiveCount() {
-    return messageReceiveCount;
-  }
-
-  public List<FileChunk> get() throws IOException {
-    List<FileChunk> fileChunks = new ArrayList<>();
-    if (useLocalFile) {
-      startTime = System.currentTimeMillis();
-      finishTime = System.currentTimeMillis();
-      state = TajoProtos.FetcherState.FETCH_FINISHED;
-      fileChunks.add(fileChunk);
-      fileLen = fileChunk.getFile().length();
-      return fileChunks;
-    }
-
-    if (state == FetcherState.FETCH_INIT) {
-      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
-      bootstrap.handler(initializer);
-    }
-
-    this.startTime = System.currentTimeMillis();
-    this.state = TajoProtos.FetcherState.FETCH_FETCHING;
-    ChannelFuture future = null;
-    try {
-      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
-              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-
-      // Wait until the connection attempt succeeds or fails.
-      Channel channel = future.awaitUninterruptibly().channel();
-      if (!future.isSuccess()) {
-        state = TajoProtos.FetcherState.FETCH_FAILED;
-        throw new IOException(future.cause());
-      }
-
-      String query = uri.getPath()
-          + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
-      // Prepare the HTTP request.
-      HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
-      request.headers().set(HttpHeaders.Names.HOST, host);
-      request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Status: " + getState() + ", URI:" + uri);
-      }
-      // Send the HTTP request.
-      channel.writeAndFlush(request);
-
-      // Wait for the server to close the connection. throw exception if failed
-      channel.closeFuture().syncUninterruptibly();
-
-      fileChunk.setLength(fileChunk.getFile().length());
-
-      long start = 0;
-      for (Long eachChunkLength : chunkLengths) {
-        if (eachChunkLength == 0) continue;
-        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
-        chunk.setEbId(fileChunk.getEbId());
-        chunk.setFromRemote(fileChunk.fromRemote());
-        fileChunks.add(chunk);
-        start += eachChunkLength;
-      }
-      return fileChunks;
-
-    } finally {
-      if(future != null && future.channel().isOpen()){
-        // Close the channel to exit.
-        future.channel().close().awaitUninterruptibly();
-      }
-
-      this.finishTime = System.currentTimeMillis();
-      long elapsedMills = finishTime - startTime;
-      String transferSpeed;
-      if(elapsedMills > 1000) {
-        long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
-        transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
-      } else {
-        transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
-      }
-
-      LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
-          elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
-    }
-  }
-
-  public URI getURI() {
-    return this.uri;
-  }
-
-  class HttpClientHandler extends ChannelInboundHandlerAdapter {
-    private final File file;
-    private RandomAccessFile raf;
-    private FileChannel fc;
-    private long length = -1;
-
-    public HttpClientHandler(File file) throws FileNotFoundException {
-      this.file = file;
-      this.raf = new RandomAccessFile(file, "rw");
-      this.fc = raf.getChannel();
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      messageReceiveCount++;
-      if (msg instanceof HttpResponse) {
-        try {
-          HttpResponse response = (HttpResponse) msg;
-
-          StringBuilder sb = new StringBuilder();
-          if (LOG.isDebugEnabled()) {
-            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
-                .append(response.getProtocolVersion()).append(", HEADER: ");
-          }
-          if (!response.headers().names().isEmpty()) {
-            for (String name : response.headers().names()) {
-              for (String value : response.headers().getAll(name)) {
-                if (LOG.isDebugEnabled()) {
-                  sb.append(name).append(" = ").append(value);
-                }
-                if (this.length == -1 && name.equals("Content-Length")) {
-                  this.length = Long.parseLong(value);
-                }
-              }
-            }
-            if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
-              String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
-
-              for (String eachSplit : stringOffset.split(",")) {
-                chunkLengths.add(Long.parseLong(eachSplit));
-              }
-            }
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(sb.toString());
-          }
-
-          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
-            LOG.warn("There are no data corresponding to the request");
-            length = 0;
-            return;
-          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
-            LOG.error(response.getStatus().reasonPhrase());
-            state = TajoProtos.FetcherState.FETCH_FAILED;
-            return;
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-
-      if (msg instanceof HttpContent) {
-        try {
-          HttpContent httpContent = (HttpContent) msg;
-          ByteBuf content = httpContent.content();
-          if (content.isReadable()) {
-            content.readBytes(fc, content.readableBytes());
-          }
-
-          if (msg instanceof LastHttpContent) {
-            if (raf != null) {
-              fileLen = file.length();
-            }
-
-            finishTime = System.currentTimeMillis();
-            if (state != TajoProtos.FetcherState.FETCH_FAILED) {
-              state = TajoProtos.FetcherState.FETCH_FINISHED;
-            }
-
-            IOUtils.cleanup(LOG, fc, raf);
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      if (cause instanceof ReadTimeoutException) {
-        LOG.warn(cause.getMessage(), cause);
-      } else {
-        LOG.error("Fetch failed :", cause);
-      }
-
-      // this fetching will be retry
-      IOUtils.cleanup(LOG, fc, raf);
-      finishTime = System.currentTimeMillis();
-      state = TajoProtos.FetcherState.FETCH_FAILED;
-      ctx.close();
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-      if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
-        //channel is closed, but cannot complete fetcher
-        finishTime = System.currentTimeMillis();
-        LOG.error("Channel closed by peer: " + ctx.channel());
-        state = TajoProtos.FetcherState.FETCH_FAILED;
-      }
-      IOUtils.cleanup(LOG, fc, raf);
-      
-      super.channelUnregistered(ctx);
-    }
-  }
-
-  class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
-    private final File file;
-
-    public HttpClientChannelInitializer(File file) {
-      this.file = file;
-    }
-
-    @Override
-    protected void initChannel(Channel channel) throws Exception {
-      ChannelPipeline pipeline = channel.pipeline();
-
-      int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
-      int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
-
-      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
-      pipeline.addLast("inflater", new HttpContentDecompressor());
-      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
-      pipeline.addLast("handler", new HttpClientHandler(file));
-    }
-  }
-}


[2/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
index c90f1aa..75f6080 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -18,22 +18,47 @@
 
 package org.apache.tajo.pullserver;
 
-import org.apache.commons.lang.reflect.MethodUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.LoadingCache;
+import com.google.gson.Gson;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.Pair;
 
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
+import java.io.*;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class PullServerUtil {
   private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
 
   private static boolean nativeIOPossible = false;
-  private static Method posixFadviseIfPossible;
 
   static {
-    if (NativeIO.isAvailable() && loadNativeIO()) {
+    if (NativeIO.isAvailable()) {
       nativeIOPossible = true;
     } else {
       LOG.warn("Unable to load hadoop nativeIO");
@@ -53,7 +78,7 @@ public class PullServerUtil {
                                             long offset, long len, int flags) {
     if (nativeIOPossible) {
       try {
-        posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, offset, len, flags);
       } catch (Throwable t) {
         nativeIOPossible = false;
         LOG.warn("Failed to manage OS cache for " + identifier, t);
@@ -61,30 +86,643 @@ public class PullServerUtil {
     }
   }
 
-  /* load hadoop native method if possible */
-  private static boolean loadNativeIO() {
-    boolean loaded = true;
-    if (nativeIOPossible) return loaded;
+  public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+    return StorageUtil.concatPath(
+            queryId,
+            "output",
+            executionBlockSequenceId);
+  }
 
-    Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
-    try {
-      Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
-      Class posixClass;
-      if (getCacheManipulator != null) {
-        Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
-        posixClass = posix.getClass();
+  public static Path getBaseInputDir(String queryId, String executionBlockId) {
+    return StorageUtil.concatPath(
+            queryId,
+            "in",
+            executionBlockId);
+  }
+
+  public static List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+
+  public static boolean isChunkRequest(String requestType) {
+    return requestType.equals(PullServerConstants.CHUNK_REQUEST_PARAM_STRING);
+  }
+
+  public static boolean isMetaRequest(String requestType) {
+    return requestType.equals(PullServerConstants.META_REQUEST_PARAM_STRING);
+  }
+
+  public static boolean isRangeShuffle(String shuffleType) {
+    return shuffleType.equals(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+  }
+
+  public static boolean isHashShuffle(String shuffleType) {
+    return shuffleType.equals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        || shuffleType.equals(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
+  }
+
+  public static class PullServerParams extends HashMap<String, List<String>> {
+
+    public PullServerParams(URI uri) {
+      this(uri.toString());
+    }
+
+    public PullServerParams(String uri) {
+      super(new QueryStringDecoder(uri).parameters());
+    }
+
+    public boolean contains(Param param) {
+      return containsKey(param.key());
+    }
+
+    public List<String> get(Param param) {
+      return get(param.key());
+    }
+
+    private String checkAndGetFirstParam(Param param) {
+      Preconditions.checkArgument(contains(param), "Missing " + param.name());
+      Preconditions.checkArgument(get(param).size() == 1, "Too many params: " + param.name());
+      return get(param).get(0);
+    }
+
+    private List<String> checkAndGet(Param param) {
+      Preconditions.checkArgument(contains(param), "Missing " + param.name());
+      return get(param);
+    }
+
+    public String requestType() {
+      return checkAndGetFirstParam(Param.REQUEST_TYPE);
+    }
+
+    public String shuffleType() {
+      return checkAndGetFirstParam(Param.SHUFFLE_TYPE);
+    }
+
+    public String queryId() {
+      return checkAndGetFirstParam(Param.QUERY_ID);
+    }
+
+    public String ebId() {
+      return checkAndGetFirstParam(Param.EB_ID);
+    }
+
+    public long offset() {
+      return contains(Param.OFFSET) && get(Param.OFFSET).size() == 1 ?
+          Long.parseLong(get(Param.OFFSET).get(0)) : -1L;
+    }
+
+    public long length() {
+      return contains(Param.LENGTH) && get(Param.LENGTH).size() == 1 ?
+          Long.parseLong(get(Param.LENGTH).get(0)) : -1L;
+    }
+
+    public String startKey() {
+      return checkAndGetFirstParam(Param.START);
+    }
+
+    public String endKey() {
+      return checkAndGetFirstParam(Param.END);
+    }
+
+    public boolean last() {
+      return contains(Param.FINAL);
+    }
+
+    public String partId() {
+      return checkAndGetFirstParam(Param.PART_ID);
+    }
+
+    public List<String> taskAttemptIds() {
+      return checkAndGet(Param.TASK_ID);
+    }
+  }
+
+  public static class PullServerRequestURIBuilder {
+    private final StringBuilder builder = new StringBuilder("http://");
+    private String requestType;
+    private String shuffleType;
+    private String queryId;
+    private Integer ebId;
+    private Integer partId;
+    private List<Integer> taskIds;
+    private List<Integer> attemptIds;
+    private List<String> taskAttemptIds;
+    private Long offset;
+    private Long length;
+    private String startKeyBase64;
+    private String endKeyBase64;
+    private boolean last;
+    private final int maxUrlLength;
+
+    public PullServerRequestURIBuilder(String pullServerAddr, int pullServerPort, int maxUrlLength) {
+      this(pullServerAddr, Integer.toString(pullServerPort), maxUrlLength);
+    }
+
+    public PullServerRequestURIBuilder(String pullServerAddr, String pullServerPort, int maxUrlLength) {
+      builder.append(pullServerAddr).append(":").append(pullServerPort).append("/?");
+      this.maxUrlLength = maxUrlLength;
+    }
+
+    public List<URI> build(boolean includeTasks) {
+      append(Param.REQUEST_TYPE, requestType)
+          .append(Param.QUERY_ID, queryId)
+          .append(Param.EB_ID, ebId)
+          .append(Param.PART_ID, partId)
+          .append(Param.SHUFFLE_TYPE, shuffleType);
+
+      if (startKeyBase64 != null) {
+
+        try {
+          append(Param.START, URLEncoder.encode(startKeyBase64, "utf-8"))
+              .append(Param.END, URLEncoder.encode(endKeyBase64, "utf-8"));
+        } catch (UnsupportedEncodingException e) {
+          throw new RuntimeException(e);
+        }
+
+        if (last) {
+          append(Param.FINAL, Boolean.toString(last));
+        }
+      }
+
+      if (length != null) {
+        append(Param.OFFSET, offset.toString())
+            .append(Param.LENGTH, length.toString());
+      }
+
+      List<URI> results = new ArrayList<>();
+      if (!includeTasks || isHashShuffle(shuffleType)) {
+        results.add(URI.create(builder.toString()));
       } else {
-        posixClass = NativeIO.POSIX.class;
+        builder.append(Param.TASK_ID.key()).append("=");
+        List<String> taskAttemptIds = this.taskAttemptIds;
+        if (taskAttemptIds == null) {
+
+          // Sort task ids to increase cache hit in pull server
+          taskAttemptIds = IntStream.range(0, taskIds.size())
+              .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+              .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+              // In the case of hash shuffle each partition has single shuffle file per worker.
+              // TODO If file is large, consider multiple fetching(shuffle file can be split)
+              .filter(pair -> pair.getFirst() >= 0)
+              .map(pair -> pair.getFirst() + "_" + pair.getSecond())
+              .collect(Collectors.toList());
+        }
+
+        // If the get request is longer than 2000 characters,
+        // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+        // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+        // The below code transforms a long request to multiple requests.
+        List<String> taskIdsParams = new ArrayList<>();
+        StringBuilder taskIdListBuilder = new StringBuilder();
+
+        boolean first = true;
+        for (int i = 0; i < taskAttemptIds.size(); i++) {
+          if (!first) {
+            taskIdListBuilder.append(",");
+          }
+          first = false;
+
+          if (builder.length() + taskIdListBuilder.length() > maxUrlLength) {
+            taskIdsParams.add(taskIdListBuilder.toString());
+            taskIdListBuilder = new StringBuilder(taskAttemptIds.get(i));
+          } else {
+            taskIdListBuilder.append(taskAttemptIds.get(i));
+          }
+        }
+        // if the url params remain
+        if (taskIdListBuilder.length() > 0) {
+          taskIdsParams.add(taskIdListBuilder.toString());
+        }
+        for (String param : taskIdsParams) {
+          results.add(URI.create(builder + param));
+        }
+      }
+
+      return results;
+    }
+
+    private PullServerRequestURIBuilder append(Param key, Object val) {
+      builder.append(key.key())
+          .append("=")
+          .append(val)
+          .append("&");
+
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setRequestType(String type) {
+      this.requestType = type;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setShuffleType(String shuffleType) {
+      this.shuffleType = shuffleType;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setQueryId(String queryId) {
+      this.queryId = queryId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEbId(String ebId) {
+      this.ebId = Integer.parseInt(ebId);
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEbId(Integer ebId) {
+      this.ebId = ebId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setPartId(String partId) {
+      this.partId = Integer.parseInt(partId);
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setPartId(Integer partId) {
+      this.partId = partId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setTaskIds(List<Integer> taskIds) {
+      this.taskIds = taskIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setAttemptIds(List<Integer> attemptIds) {
+      this.attemptIds = attemptIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setTaskAttemptIds(List<String> taskAttemptIds) {
+      this.taskAttemptIds = taskAttemptIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setOffset(long offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setLength(long length) {
+      this.length = length;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setStartKeyBase64(String startKeyBase64) {
+      this.startKeyBase64 = startKeyBase64;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEndKeyBase64(String endKeyBase64) {
+      this.endKeyBase64 = endKeyBase64;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setLastInclude(boolean last) {
+      this.last = last;
+      return this;
+    }
+  }
+
+  public static boolean useExternalPullServerService(TajoConf conf) {
+    // TODO: add more service types like mesos
+    return TajoPullServerService.isStandalone()
+        || conf.getBoolVar(ConfVars.YARN_SHUFFLE_SERVICE_ENABLED);
+  }
+
+  private static FileChunkMeta searchFileChunkMeta(String queryId,
+                                                  String ebSeqId,
+                                                  String taskId,
+                                                  Path outDir,
+                                                  String startKey,
+                                                  String endKey,
+                                                  boolean last,
+                                                  LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                                  int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+    SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+        indexReaderCache, lowCacheHitCheckThreshold);
+    // Do not send file chunks of 0 length
+    if (result != null) {
+      long startOffset = result.startOffset;
+      long endOffset = result.endOffset;
+
+      FileChunkMeta chunk = new FileChunkMeta(startOffset, endOffset - startOffset, ebSeqId, taskId);
+
+      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+      return chunk;
+    } else {
+      return null;
+    }
+  }
+
+  private static FileChunk searchFileChunk(String queryId,
+                                           String ebSeqId,
+                                           Path outDir,
+                                           String startKey,
+                                           String endKey,
+                                           boolean last,
+                                           LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                           int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+
+    final SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+        indexReaderCache, lowCacheHitCheckThreshold);
+    if (result != null) {
+      long startOffset = result.startOffset;
+      long endOffset = result.endOffset;
+      FileChunk chunk = new FileChunk(result.data, startOffset, endOffset - startOffset);
+
+      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+      return chunk;
+    } else {
+      return null;
+    }
+  }
+
+  private static class SearchResult {
+    File data;
+    long startOffset;
+    long endOffset;
+
+    public SearchResult(File data, long startOffset, long endOffset) {
+      this.data = data;
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+  }
+
+  private static SearchResult searchCorrespondPart(String queryId,
+                                                   String ebSeqId,
+                                                   Path outDir,
+                                                   String startKey,
+                                                   String endKey,
+                                                   boolean last,
+                                                   LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                                   int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+    BSTIndexReader idxReader = indexReaderCache.get(new IndexCacheKey(outDir, queryId, ebSeqId));
+    idxReader.retain();
+
+    File data;
+    long startOffset;
+    long endOffset;
+    try {
+      if (LOG.isDebugEnabled()) {
+        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+        }
+      }
+
+      Tuple indexedFirst = idxReader.getFirstKey();
+      Tuple indexedLast = idxReader.getLastKey();
+
+      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no contents");
+        }
+        return null;
+      }
+
+      byte[] startBytes = Base64.decodeBase64(startKey);
+      byte[] endBytes = Base64.decodeBase64(endKey);
+
+
+      Tuple start;
+      Tuple end;
+      Schema keySchema = idxReader.getKeySchema();
+      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
+      try {
+        start = decoder.toTuple(startBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("StartKey: " + startKey
+            + ", decoded byte size: " + startBytes.length, t);
+      }
+
+      try {
+        end = decoder.toTuple(endBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("EndKey: " + endKey
+            + ", decoded byte size: " + endBytes.length, t);
       }
-      posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
-    } catch (Throwable e) {
-      loaded = false;
-      LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage(), e);
+
+      data = new File(URI.create(outDir.toUri() + "/output"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+            (last ? ", last=true" : "") + ")");
+      }
+
+      TupleComparator comparator = idxReader.getComparator();
+
+      if (comparator.compare(end, indexedFirst) < 0 ||
+          comparator.compare(indexedLast, start) < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+              "], but request start:" + start + ", end: " + end);
+        }
+        return null;
+      }
+
+      try {
+        idxReader.init();
+        startOffset = idxReader.find(start);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+      try {
+        endOffset = idxReader.find(end);
+        if (endOffset == -1) {
+          endOffset = idxReader.find(end, true);
+        }
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+
+      // if startOffset == -1 then case 2-1 or case 3
+      if (startOffset == -1) { // this is a hack
+        // if case 2-1 or case 3
+        try {
+          startOffset = idxReader.find(start, true);
+        } catch (IOException ioe) {
+          LOG.error("State Dump (the requested range: "
+              + "[" + start + ", " + end + ")" + ", idx min: "
+              + idxReader.getFirstKey() + ", idx max: "
+              + idxReader.getLastKey());
+          throw ioe;
+        }
+      }
+
+      if (startOffset == -1) {
+        throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+            "State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+      }
+
+      // if greater than indexed values
+      if (last || (endOffset == -1
+          && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+        endOffset = data.length();
+      }
+    } finally {
+      idxReader.release();
     }
 
-    if (posixFadviseIfPossible == null) {
-      loaded = false;
+    return new SearchResult(data, startOffset, endOffset);
+  }
+
+  /**
+   * Retrieve meta information of file chunks which correspond to the requested URI.
+   * Only meta information for the file chunks which has non-zero length are retrieved.
+   *
+   * @param conf
+   * @param lDirAlloc
+   * @param localFS
+   * @param params
+   * @param gson
+   * @param indexReaderCache
+   * @param lowCacheHitCheckThreshold
+   * @return
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public static List<String> getJsonMeta(final TajoConf conf,
+                                         final LocalDirAllocator lDirAlloc,
+                                         final FileSystem localFS,
+                                         final PullServerParams params,
+                                         final Gson gson,
+                                         final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                         final int lowCacheHitCheckThreshold)
+      throws IOException, ExecutionException {
+    final List<String> taskIds = PullServerUtil.splitMaps(params.taskAttemptIds());
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+    final List<String> jsonMetas = new ArrayList<>();
+
+    for (String eachTaskId : taskIds) {
+      Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+      if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+        LOG.warn("Range shuffle - file not exist. " + outputPath);
+        continue;
+      }
+      Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+      FileChunkMeta meta;
+      meta = PullServerUtil.searchFileChunkMeta(params.queryId(), params.ebId(), eachTaskId, path,
+          params.startKey(), params.endKey(), params.last(), indexReaderCache, lowCacheHitCheckThreshold);
+      if (meta != null && meta.getLength() > 0) {
+        String jsonStr = gson.toJson(meta, FileChunkMeta.class);
+        jsonMetas.add(jsonStr);
+      }
+    }
+    return jsonMetas;
+  }
+
+  /**
+   * Retrieve file chunks which correspond to the requested URI.
+   * Only the file chunks which has non-zero length are retrieved.
+   *
+   * @param conf
+   * @param lDirAlloc
+   * @param localFS
+   * @param params
+   * @param indexReaderCache
+   * @param lowCacheHitCheckThreshold
+   * @return
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public static List<FileChunk> getFileChunks(final TajoConf conf,
+                                              final LocalDirAllocator lDirAlloc,
+                                              final FileSystem localFS,
+                                              final PullServerParams params,
+                                              final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                              final int lowCacheHitCheckThreshold)
+      throws IOException, ExecutionException {
+    final List<FileChunk> chunks = new ArrayList<>();
+
+    final String queryId = params.queryId();
+    final String shuffleType = params.shuffleType();
+    final String sid =  params.ebId();
+
+    final long offset = params.offset();
+    final long length = params.length();
+
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId, sid);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid);
+
+      // the working dir of tajo worker for each query
+      LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+    }
+
+    // if a stage requires a range shuffle
+    if (PullServerUtil.isRangeShuffle(shuffleType)) {
+      final List<String> taskIdList = params.taskAttemptIds();
+      final List<String> taskIds = PullServerUtil.splitMaps(taskIdList);
+
+      final String startKey = params.startKey();
+      final String endKey = params.endKey();
+      final boolean last = params.last();
+
+      long before = System.currentTimeMillis();
+      for (String eachTaskId : taskIds) {
+        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+        if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+          LOG.warn(outputPath + " does not exist.");
+          continue;
+        }
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+        FileChunk chunk = PullServerUtil.searchFileChunk(queryId, sid, path, startKey, endKey, last, indexReaderCache,
+            lowCacheHitCheckThreshold);
+        if (chunk != null) {
+          chunks.add(chunk);
+        }
+      }
+      long after = System.currentTimeMillis();
+      LOG.info("Index lookup time: " + (after - before) + " ms");
+
+      // if a stage requires a hash shuffle or a scattered hash shuffle
+    } else if (PullServerUtil.isHashShuffle(shuffleType)) {
+
+      final String partId = params.partId();
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+      Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+      if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
+        throw new FileNotFoundException(partPath.toString());
+      }
+
+      Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+        String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+        throw new EOFException(errorMessage);
+      }
+      FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunks.add(chunk);
+    } else {
+      throw new IllegalArgumentException(shuffleType);
     }
-    return loaded;
+    return chunks.stream().filter(c -> c.length() > 0).collect(Collectors.toList());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index 4609712..069e660 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -54,11 +54,6 @@ public class TajoPullServer extends CompositeService {
     start();
   }
 
-  public void start() {
-    super.start();
-
-  }
-
   public static void main(String[] args) throws Exception {
     StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index cbeba52..aa16f87 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -22,8 +22,9 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
-import com.google.common.collect.Lists;
+import com.google.gson.Gson;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.channel.group.ChannelGroup;
@@ -31,12 +32,13 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,42 +58,34 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
 import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.*;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 public class TajoPullServerService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
 
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
   private int port;
   private ServerBootstrap selector;
   private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@@ -99,7 +93,6 @@ public class TajoPullServerService extends AbstractService {
   private int sslFileBufferSize;
   private int maxUrlLength;
 
-  private ApplicationId appId;
   private FileSystem localFS;
 
   /**
@@ -110,62 +103,14 @@ public class TajoPullServerService extends AbstractService {
   private int readaheadLength;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
-  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
-  private static final Map<String,String> userRsrc =
-          new ConcurrentHashMap<>();
-  private String userName;
-
-  private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
-  private static int lowCacheHitCheckThreshold;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "tajo.pullserver.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+  private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+  private int lowCacheHitCheckThreshold;
 
   private static final boolean STANDALONE;
 
-  private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
-  private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
-
-  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
-
-  static class CacheKey {
-    private Path path;
-    private String queryId;
-    private String ebSeqId;
-
-    public CacheKey(Path path, String queryId, String ebSeqId) {
-      this.path = path;
-      this.queryId = queryId;
-      this.ebSeqId = ebSeqId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof CacheKey) {
-        CacheKey other = (CacheKey) o;
-        return Objects.equals(this.path, other.path)
-            && Objects.equals(this.queryId, other.queryId)
-            && Objects.equals(this.ebSeqId, other.ebSeqId);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(path, queryId, ebSeqId);
-    }
-  }
-
   static {
-    /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
-    SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
-    REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
-
-    String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
+    String standalone = System.getenv(PullServerConstants.PULLSERVER_STANDALONE_ENV_KEY);
+    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase(Boolean.TRUE.toString());
   }
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,7 +138,7 @@ public class TajoPullServerService extends AbstractService {
   final ShuffleMetrics metrics;
 
   TajoPullServerService(MetricsSystem ms) {
-    super("httpshuffle");
+    super(PullServerConstants.PULLSERVER_SERVICE_NAME);
     metrics = ms.register(new ShuffleMetrics());
   }
 
@@ -202,58 +147,33 @@ public class TajoPullServerService extends AbstractService {
     this(DefaultMetricsSystem.instance());
   }
 
-  public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
-    // TODO these bytes should be versioned
-    // TODO: Once SHuffle is out of NM, this can use MR APIs
-    this.appId = appId;
-    this.userName = user;
-    userRsrc.put(appId.toString(), user);
-  }
-
-  public void stopApp(ApplicationId appId) {
-    userRsrc.remove(appId.toString());
-  }
-
+  // TODO change AbstractService to throw InterruptedException
   @Override
-  public void init(Configuration conf) {
-    try {
-      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-          DEFAULT_SHUFFLE_READAHEAD_BYTES);
+  public void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+    TajoConf tajoConf = (TajoConf) conf;
 
-      int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
-          Runtime.getRuntime().availableProcessors() * 2);
+    manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+        PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
 
-      selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
-                   .option(ChannelOption.TCP_NODELAY, true)
-                   .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
-                   .childOption(ChannelOption.TCP_NODELAY, true);
+    readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+        PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
 
-      localFS = new LocalFileSystem();
+    int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
 
-      maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
-          ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+    selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
+        .option(ChannelOption.TCP_NODELAY, true)
+        .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .childOption(ChannelOption.TCP_NODELAY, true);
 
-      conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
-          , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
-      super.init(conf);
-      LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    }
-  }
+    localFS = new LocalFileSystem();
 
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
+    maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
 
     ServerBootstrap bootstrap = selector.clone();
-    TajoConf tajoConf = (TajoConf)conf;
     try {
       channelInitializer = new HttpChannelInitializer(tajoConf);
     } catch (Exception ex) {
@@ -263,7 +183,7 @@ public class TajoPullServerService extends AbstractService {
       .channel(NioServerSocketChannel.class);
 
     port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
-    ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+    ChannelFuture future = bootstrap.bind(port)
         .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
         .syncUninterruptibly();
 
@@ -272,8 +192,8 @@ public class TajoPullServerService extends AbstractService {
     tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
     LOG.info(getName() + " listening on port " + port);
 
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    sslFileBufferSize = conf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
     int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
     int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
@@ -283,10 +203,10 @@ public class TajoPullServerService extends AbstractService {
         .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
         .removalListener(removalListener)
         .build(
-            new CacheLoader<CacheKey, BSTIndexReader>() {
+            new CacheLoader<IndexCacheKey, BSTIndexReader>() {
               @Override
-              public BSTIndexReader load(CacheKey key) throws Exception {
-                return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+              public BSTIndexReader load(IndexCacheKey key) throws Exception {
+                return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
               }
             }
         );
@@ -353,29 +273,31 @@ public class TajoPullServerService extends AbstractService {
   }
 
   @Override
-  public void stop() {
-    try {
-      accepted.close();
-      if (selector != null) {
-        if (selector.group() != null) {
-          selector.group().shutdownGracefully();
-        }
-        if (selector.childGroup() != null) {
-          selector.childGroup().shutdownGracefully();
-        }
+  public void serviceStop() throws Exception {
+    accepted.close();
+    if (selector != null) {
+      if (selector.group() != null) {
+        selector.group().shutdownGracefully();
       }
-
-      if (channelInitializer != null) {
-        channelInitializer.destroy();
+      if (selector.childGroup() != null) {
+        selector.childGroup().shutdownGracefully();
       }
+    }
 
-      localFS.close();
-      indexReaderCache.invalidateAll();
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    } finally {
-      super.stop();
+    if (channelInitializer != null) {
+      channelInitializer.destroy();
     }
+
+    localFS.close();
+    indexReaderCache.invalidateAll();
+
+    super.serviceStop();
+  }
+
+  public List<FileChunk> getFileChunks(TajoConf conf, LocalDirAllocator lDirAlloc, PullServerParams params)
+      throws IOException, ExecutionException {
+    return PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+        lowCacheHitCheckThreshold);
   }
 
   class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -385,8 +307,7 @@ public class TajoPullServerService extends AbstractService {
 
     public HttpChannelInitializer(TajoConf conf) throws Exception {
       PullServer = new PullServer(conf);
-      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
-          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+      if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
         sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
         sslFactory.init();
       }
@@ -417,72 +338,13 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-
-  Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<>();
-
-  public void completeFileChunk(FileRegion filePart,
-                                   String requestUri,
-                                   long startTime) {
-    ProcessingStatus status = processingStatusMap.get(requestUri);
-    if (status != null) {
-      status.decrementRemainFiles(filePart, startTime);
-    }
-  }
-
-  class ProcessingStatus {
-    String requestUri;
-    int numFiles;
-    long startTime;
-    long makeFileListTime;
-    long minTime = Long.MAX_VALUE;
-    long maxTime;
-    volatile int numSlowFile;
-    volatile int remainFiles;
-
-    public ProcessingStatus(String requestUri) {
-      this.requestUri = requestUri;
-      this.startTime = System.currentTimeMillis();
-    }
-
-    public void setNumFiles(int numFiles) {
-      this.numFiles = numFiles;
-      this.remainFiles = numFiles;
-    }
-
-    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
-      long fileSendTime = System.currentTimeMillis() - fileStartTime;
-
-      if (fileSendTime > maxTime) {
-        maxTime = fileSendTime;
-      }
-      if (fileSendTime < minTime) {
-        minTime = fileSendTime;
-      }
-
-      if (fileSendTime > 20 * 1000) {
-        LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " +
-            "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri);
-        SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1);
-      }
-
-      REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
-      if (REMAIN_FILE_UPDATER.get(this) <= 0) {
-        processingStatusMap.remove(requestUri);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
-              + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
-              + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
-        }
-      }
-    }
-  }
-
   @ChannelHandler.Sharable
   class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
     private final TajoConf conf;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    private final Gson gson = new Gson();
 
     public PullServer(TajoConf conf) throws IOException {
       this.conf = conf;
@@ -512,7 +374,7 @@ public class TajoPullServerService extends AbstractService {
       }
 
       if (request.getMethod() == HttpMethod.DELETE) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
 
         clearIndexCache(request.getUri());
@@ -523,110 +385,117 @@ public class TajoPullServerService extends AbstractService {
       }
 
       // Parsing the URL into key-values
-      Map<String, List<String>> params = null;
       try {
-        params = decodeParams(request.getUri());
+        final PullServerParams params = new PullServerParams(request.getUri());
+        if (PullServerUtil.isChunkRequest(params.requestType())) {
+          handleChunkRequest(ctx, request, params);
+        } else {
+          handleMetaRequest(ctx, request, params);
+        }
       } catch (Throwable e) {
-        LOG.error("Failed to decode uri " + request.getUri());
+        LOG.error("Failed to handle request " + request.getUri());
         sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
         return;
       }
+    }
 
-      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
-      processingStatusMap.put(request.getUri(), processingStatus);
-
-      String partId = params.get("p").get(0);
-      String queryId = params.get("qid").get(0);
-      String shuffleType = params.get("type").get(0);
-      String sid =  params.get("sid").get(0);
-
-      final List<String> taskIdList = params.get("ta");
-      final List<String> offsetList = params.get("offset");
-      final List<String> lengthList = params.get("length");
-
-      long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-      long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-      List<String> taskIds = splitMaps(taskIdList);
-
-      Path queryBaseDir = getBaseOutputDir(queryId, sid);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-            + ", taskIds=" + taskIdList);
-
-        // the working dir of tajo worker for each query
-        LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+    /**
+     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+     * It is called whenever an execution block is completed.
+     *
+     * @param uri query URI which indicates the execution block id
+     * @throws IOException
+     * @throws InvalidURLException
+     */
+    public void clearIndexCache(String uri)
+        throws IOException, InvalidURLException {
+      // Simply parse the given uri
+      String[] tokens = uri.split("=");
+      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+        throw new IllegalArgumentException("invalid params: " + uri);
       }
-
-      final List<FileChunk> chunks = Lists.newArrayList();
-
-      // if a stage requires a range shuffle
-      if (shuffleType.equals("r")) {
-        final String startKey = params.get("start").get(0);
-        final String endKey = params.get("end").get(0);
-        final boolean last = params.get("final") != null;
-
-        long before = System.currentTimeMillis();
-        for (String eachTaskId : taskIds) {
-          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
-          if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
-            LOG.warn(outputPath + "does not exist.");
-            continue;
-          }
-          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
-
-          FileChunk chunk;
-          try {
-            chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
-          } catch (Throwable t) {
-            LOG.error("ERROR Request: " + request.getUri(), t);
-            sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
-            return;
+      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+      String queryId = ebId.getQueryId().toString();
+      String ebSeqId = Integer.toString(ebId.getId());
+      List<IndexCacheKey> removed = new ArrayList<>();
+      synchronized (indexReaderCache) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
           }
-          if (chunk != null) {
-            chunks.add(chunk);
+        }
+        indexReaderCache.invalidateAll(removed);
+      }
+      removed.clear();
+      synchronized (waitForRemove) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
           }
         }
-        long after = System.currentTimeMillis();
-        LOG.info("Index lookup time: " + (after - before) + " ms");
-
-        // if a stage requires a hash shuffle or a scattered hash shuffle
-      } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-        int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-        Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
-        if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
-          LOG.warn("Partition shuffle file not exists: " + partPath);
-          sendError(ctx, HttpResponseStatus.NO_CONTENT);
-          return;
+        for (IndexCacheKey eachKey : removed) {
+          waitForRemove.remove(eachKey);
         }
+      }
+    }
+
+    private void handleMetaRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+        throws IOException, ExecutionException {
+      final List<String> jsonMetas;
+      try {
+        jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
+      }
 
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.OK,
+          Unpooled.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+      response.headers().set(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+      HttpHeaders.setContentLength(response, response.content().readableBytes());
+      if (HttpHeaders.isKeepAlive(request)) {
+        response.headers().set(Names.CONNECTION, Values.KEEP_ALIVE);
+      }
+      ChannelFuture writeFuture = ctx.writeAndFlush(response);
 
-        File file = new File(path.toUri());
-        long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-        long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+      // Decide whether to close the connection or not.
+      if (!HttpHeaders.isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
 
-        if (startPos >= file.length()) {
-          String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
-          LOG.error(errorMessage);
-          sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
-          return;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
-        }
-        FileChunk chunk = new FileChunk(file, startPos, readLen);
-        chunks.add(chunk);
-      } else {
-        LOG.error("Unknown shuffle type: " + shuffleType);
-        sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
+    private void handleChunkRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+        throws IOException {
+      final List<FileChunk> chunks;
+      try {
+        chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
         return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
       }
 
       // Write the content.
       if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
 
         if (!HttpHeaders.isKeepAlive(request)) {
           ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -637,7 +506,7 @@ public class TajoPullServerService extends AbstractService {
       } else {
         FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
         ChannelFuture writeFuture = null;
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
         StringBuilder sb = new StringBuilder();
         for (FileChunk chunk : file) {
@@ -645,7 +514,7 @@ public class TajoPullServerService extends AbstractService {
           sb.append(Long.toString(chunk.length())).append(",");
         }
         sb.deleteCharAt(sb.length() - 1);
-        HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
+        HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
         HttpHeaders.setContentLength(response, totalSize);
 
         if (HttpHeaders.isKeepAlive(request)) {
@@ -655,7 +524,7 @@ public class TajoPullServerService extends AbstractService {
         writeFuture = ctx.write(response);
 
         for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, chunk, request.getUri());
+          writeFuture = sendFile(ctx, chunk);
           if (writeFuture == null) {
             sendError(ctx, HttpResponseStatus.NOT_FOUND);
             return;
@@ -676,53 +545,8 @@ public class TajoPullServerService extends AbstractService {
       }
     }
 
-    /**
-     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
-     * It is called whenever an execution block is completed.
-     *
-     * @param uri query URI which indicates the execution block id
-     * @throws IOException
-     * @throws InvalidURLException
-     */
-    private void clearIndexCache(String uri) throws IOException, InvalidURLException {
-      // Simply parse the given uri
-      String[] tokens = uri.split("=");
-      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
-        throw new IllegalArgumentException("invalid params: " + uri);
-      }
-      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
-      String queryId = ebId.getQueryId().toString();
-      String ebSeqId = Integer.toString(ebId.getId());
-      List<CacheKey> removed = new ArrayList<>();
-      synchronized (indexReaderCache) {
-        for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
-          CacheKey key = e.getKey();
-          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
-            e.getValue().forceClose();
-            removed.add(e.getKey());
-          }
-        }
-        indexReaderCache.invalidateAll(removed);
-      }
-      removed.clear();
-      synchronized (waitForRemove) {
-        for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
-          CacheKey key = e.getKey();
-          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
-            e.getValue().forceClose();
-            removed.add(e.getKey());
-          }
-        }
-        for (CacheKey eachKey : removed) {
-          waitForRemove.remove(eachKey);
-        }
-      }
-    }
-
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   FileChunk file,
-                                   String requestUri) throws IOException {
-      long startTime = System.currentTimeMillis();
+                                   FileChunk file) throws IOException {
       RandomAccessFile spill = null;      
       ChannelFuture writeFuture;
       try {
@@ -732,7 +556,7 @@ public class TajoPullServerService extends AbstractService {
               file.startOffset(), file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
           writeFuture = ctx.write(filePart);
-          writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+          writeFuture.addListener(new FileCloseListener(filePart));
         } else {
           // HTTPS cannot be done with zero copy.
           final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -763,9 +587,10 @@ public class TajoPullServerService extends AbstractService {
 
     private void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
-      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
-          Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+      ByteBuf content = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, content);
       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+      HttpHeaders.setContentLength(response, content.writerIndex());
 
       // Close the connection as soon as the error message is sent.
       ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -782,12 +607,12 @@ public class TajoPullServerService extends AbstractService {
   }
 
   // Temporal space to wait for the completion of all index lookup operations
-  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
 
   // RemovalListener is triggered when an item is removed from the index reader cache.
   // It closes index readers when they are not used anymore.
   // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
-  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+  private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
     BSTIndexReader reader = removal.getValue();
     if (reader.getReferenceNum() == 0) {
       try {
@@ -800,180 +625,4 @@ public class TajoPullServerService extends AbstractService {
       waitForRemove.put(removal.getKey(), reader);
     }
   };
-
-  public static FileChunk getFileChunks(String queryId,
-                                        String ebSeqId,
-                                        Path outDir,
-                                        String startKey,
-                                        String endKey,
-                                        boolean last) throws IOException, ExecutionException {
-
-    BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
-    idxReader.retain();
-
-    File data;
-    long startOffset;
-    long endOffset;
-    try {
-      if (LOG.isDebugEnabled()) {
-        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
-          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
-        }
-      }
-
-      Tuple indexedFirst = idxReader.getFirstKey();
-      Tuple indexedLast = idxReader.getLastKey();
-
-      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("There is no contents");
-        }
-        return null;
-      }
-
-      byte[] startBytes = Base64.decodeBase64(startKey);
-      byte[] endBytes = Base64.decodeBase64(endKey);
-
-
-      Tuple start;
-      Tuple end;
-      Schema keySchema = idxReader.getKeySchema();
-      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-
-      try {
-        start = decoder.toTuple(startBytes);
-      } catch (Throwable t) {
-        throw new IllegalArgumentException("StartKey: " + startKey
-            + ", decoded byte size: " + startBytes.length, t);
-      }
-
-      try {
-        end = decoder.toTuple(endBytes);
-      } catch (Throwable t) {
-        throw new IllegalArgumentException("EndKey: " + endKey
-            + ", decoded byte size: " + endBytes.length, t);
-      }
-
-      data = new File(URI.create(outDir.toUri() + "/output"));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
-            (last ? ", last=true" : "") + ")");
-      }
-
-      TupleComparator comparator = idxReader.getComparator();
-
-      if (comparator.compare(end, indexedFirst) < 0 ||
-          comparator.compare(indexedLast, start) < 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
-              "], but request start:" + start + ", end: " + end);
-        }
-        return null;
-      }
-
-      try {
-        idxReader.init();
-        startOffset = idxReader.find(start);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-      try {
-        endOffset = idxReader.find(end);
-        if (endOffset == -1) {
-          endOffset = idxReader.find(end, true);
-        }
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-
-      // if startOffset == -1 then case 2-1 or case 3
-      if (startOffset == -1) { // this is a hack
-        // if case 2-1 or case 3
-        try {
-          startOffset = idxReader.find(start, true);
-        } catch (IOException ioe) {
-          LOG.error("State Dump (the requested range: "
-              + "[" + start + ", " + end + ")" + ", idx min: "
-              + idxReader.getFirstKey() + ", idx max: "
-              + idxReader.getLastKey());
-          throw ioe;
-        }
-      }
-
-      if (startOffset == -1) {
-        throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-            "State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-      }
-
-      // if greater than indexed values
-      if (last || (endOffset == -1
-          && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-        endOffset = data.length();
-      }
-    } finally {
-      idxReader.release();
-    }
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-
-    if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-
-  public static List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
-    }
-    return ret;
-  }
-
-  public static Map<String, List<String>> decodeParams(String uri) {
-    final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
-    final List<String> types = params.get("type");
-    final List<String> qids = params.get("qid");
-    final List<String> ebIds = params.get("sid");
-    final List<String> partIds = params.get("p");
-
-    if (types == null || ebIds == null || qids == null || partIds == null) {
-      throw new IllegalArgumentException("invalid params. required :" + params);
-    }
-
-    if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
-      throw new IllegalArgumentException("invalid params. required :" + params);
-    }
-
-    return params;
-  }
-
-  public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            queryId,
-            "output",
-            executionBlockSequenceId);
-    return workDir;
-  }
-
-  public static Path getBaseInputDir(String queryId, String executionBlockId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            queryId,
-            "in",
-            executionBlockId);
-    return workDir;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index 67cff21..c5f6a6a 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -36,7 +36,7 @@ public class FileChunk {
    */
   private String ebId;
 
-  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+  public FileChunk(File file, long startOffset, long length) {
     this.file = file;
     this.startOffset = startOffset;
     this.length = length;
@@ -76,6 +76,6 @@ public class FileChunk {
 
   public String toString() {
     return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
-	+ file.getAbsolutePath();
+        + file.getAbsolutePath();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
new file mode 100644
index 0000000..3f6b3eb
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+public class FileChunkMeta {
+  private final long startOffset;
+  private final long length;
+  private final String ebId;
+  private final String taskId;
+
+  public FileChunkMeta(long startOffset, long length, String ebId, String taskId) {
+    this.startOffset = startOffset;
+    this.length = length;
+    this.ebId = ebId;
+    this.taskId = taskId;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public String getEbId() {
+    return ebId;
+  }
+
+  public String toString() {
+    return "ebId: " + ebId + ", taskId: " + taskId + " (" + startOffset + ", " + length + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
new file mode 100644
index 0000000..2a71c65
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Objects;
+
+public class IndexCacheKey {
+  private Path path;
+  private String queryId;
+  private String ebSeqId;
+
+  public IndexCacheKey(Path path, String queryId, String ebSeqId) {
+    this.path = path;
+    this.queryId = queryId;
+    this.ebSeqId = ebSeqId;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public String getEbSeqId() {
+    return ebSeqId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof IndexCacheKey) {
+      IndexCacheKey other = (IndexCacheKey) o;
+      return Objects.equals(this.path, other.path)
+          && Objects.equals(this.queryId, other.queryId)
+          && Objects.equals(this.ebSeqId, other.ebSeqId);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path, queryId, ebSeqId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn/pom.xml b/tajo-yarn/pom.xml
new file mode 100644
index 0000000..70511a1
--- /dev/null
+++ b/tajo-yarn/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>tajo-yarn</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo Yarn</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.4.1</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.iq80.snappy</groupId>
+          <artifactId>snappy</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>net.minidev</groupId>
+          <artifactId>json-smart</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.tajo</groupId>
+          <artifactId>tajo-plan</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-storage-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-orc</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop-bundle</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>trevni-avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>trevni-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-pullserver</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.tajo</groupId>
+          <artifactId>tajo-rpc-protobuf</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>src</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-source-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- builds source jars and attaches them to the project for publishing -->
+                <id>tajo-java-sources</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar-no-fork</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
new file mode 100644
index 0000000..3c0a76f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+                             int chunkSize, boolean manageOsCache, int readaheadLength,
+                             ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}