You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:10 UTC

[3/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
new file mode 100644
index 0000000..e346bdf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+  package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.rpc.NettyUtils;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LocalFetcher retrieves locally stored data. Its behavior can be different according to the pull server is running
+ * externally or internally.
+ *
+ * <ul>
+ *   <li>When an internal pull server is running, local fetchers can retrieve data directly.</li>
+ *   <li>When an external pull server is running,</li>
+ *   <ul>
+ *     <li>If the shuffle type is hash, local fetchers can still retrieve data directly.</li>
+ *     <li>If the shuffle type is range, local fetchers need to get meta information of data via HTTP. Once the meta
+ *     information is retrieved, they can read data directly.</li>
+ *   </ul>
+ * </ul>
+ */
+public class LocalFetcher extends AbstractFetcher {
+
+  private final static Log LOG = LogFactory.getLog(LocalFetcher.class);
+
+  private final TajoPullServerService pullServerService;
+
+  private final String host;
+  private int port;
+  private final Bootstrap bootstrap;
+  private final int maxUrlLength;
+  private final List<FileChunkMeta> chunkMetas = new ArrayList<>();
+  private final String tableName;
+  private final FileSystem localFileSystem;
+  private final LocalDirAllocator localDirAllocator;
+
+  @VisibleForTesting
+  public LocalFetcher(TajoConf conf, URI uri, String tableName) throws IOException {
+    super(conf, uri);
+    this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    this.tableName = tableName;
+    this.localFileSystem = new LocalFileSystem();
+    this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    this.pullServerService = null;
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+
+    bootstrap = new Bootstrap()
+        .group(
+            NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+            conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+        .option(ChannelOption.TCP_NODELAY, true);
+  }
+
+  public LocalFetcher(TajoConf conf, URI uri, ExecutionBlockContext executionBlockContext, String tableName) {
+    super(conf, uri);
+    this.localFileSystem = executionBlockContext.getLocalFS();
+    this.localDirAllocator = executionBlockContext.getLocalDirAllocator();
+    this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    this.tableName = tableName;
+
+    Optional<TajoPullServerService> optional = executionBlockContext.getSharedResource().getPullServerService();
+    if (optional.isPresent()) {
+      // local pull server service
+      this.pullServerService = optional.get();
+      this.host = null;
+      this.bootstrap = null;
+
+    } else if (PullServerUtil.useExternalPullServerService(conf)) {
+      // external pull server service
+      pullServerService = null;
+
+      String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+      this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+      this.port = uri.getPort();
+      if (port == -1) {
+        if (scheme.equalsIgnoreCase("http")) {
+          this.port = 80;
+        } else if (scheme.equalsIgnoreCase("https")) {
+          this.port = 443;
+        }
+      }
+
+      bootstrap = new Bootstrap()
+          .group(
+              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                  conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+          .channel(NioSocketChannel.class)
+          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+              conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+          .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+          .option(ChannelOption.TCP_NODELAY, true);
+    } else {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new TajoInternalError("Pull server service is not initialized");
+    }
+  }
+
+  @Override
+  public List<FileChunk> get() throws IOException {
+    this.startTime = System.currentTimeMillis();
+    return pullServerService != null ? getWithInternalPullServer() : getWithExternalPullServer();
+  }
+
+  private List<FileChunk> getWithInternalPullServer() throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+    PullServerParams params = new PullServerParams(uri.toString());
+    try {
+      fileChunks.addAll(pullServerService.getFileChunks(conf, localDirAllocator, params));
+    } catch (ExecutionException e) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new TajoInternalError(e);
+    }
+    fileChunks.stream().forEach(c -> c.setEbId(tableName));
+    endFetch(FetcherState.FETCH_DATA_FINISHED);
+    if (fileChunks.size() > 0) {
+      fileLen = fileChunks.get(0).length();
+      fileNum = 1;
+    } else {
+      fileNum = 0;
+      fileLen = 0;
+    }
+    return fileChunks;
+  }
+
+  private List<FileChunk> getWithExternalPullServer() throws IOException {
+    final PullServerParams params = new PullServerParams(uri.toString());
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+
+    if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+      return getChunksForRangeShuffle(params, queryBaseDir);
+    } else if (PullServerUtil.isHashShuffle(params.shuffleType())) {
+      return getChunksForHashShuffle(params, queryBaseDir);
+    } else {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IllegalArgumentException("unknown shuffle type: " + params.shuffleType());
+    }
+  }
+
+  private List<FileChunk> getChunksForHashShuffle(final PullServerParams params, final Path queryBaseDir)
+      throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+    final String partId = params.partId();
+    final long offset = params.offset();
+    final long length = params.length();
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    if (!localDirAllocator.ifExists(partPath.toString(), conf)) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+    }
+    final Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(partPath.toString(), conf));
+    final File file = new File(path.toUri());
+    final long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+    final long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+    if (startPos >= file.length()) {
+      endFetch(FetcherState.FETCH_FAILED);
+      throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+    }
+    if (readLen > 0) {
+      final FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunk.setEbId(tableName);
+      chunk.setFromRemote(false);
+      fileChunks.add(chunk);
+      fileLen = chunk.length();
+      fileNum = 1;
+    }
+
+    endFetch(FetcherState.FETCH_DATA_FINISHED);
+    return fileChunks;
+  }
+
+  private List<FileChunk> getChunksForRangeShuffle(final PullServerParams params, final Path queryBaseDir)
+      throws IOException {
+    final List<FileChunk> fileChunks = new ArrayList<>();
+
+    if (state == FetcherState.FETCH_INIT) {
+      final ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer();
+      bootstrap.handler(initializer);
+    }
+
+    this.state = FetcherState.FETCH_META_FETCHING;
+    ChannelFuture future = null;
+    try {
+      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+          .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+      // Wait until the connection attempt succeeds or fails.
+      Channel channel = future.awaitUninterruptibly().channel();
+      if (!future.isSuccess()) {
+        endFetch(FetcherState.FETCH_FAILED);
+        throw new IOException(future.cause());
+      }
+
+      for (URI eachURI : createChunkMetaRequestURIs(host, port, params)) {
+        String query = eachURI.getPath()
+            + (eachURI.getRawQuery() != null ? "?" + eachURI.getRawQuery() : "");
+        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+        request.headers().set(HttpHeaders.Names.HOST, host);
+        request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+        request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Status: " + getState() + ", URI:" + eachURI);
+        }
+        // Send the HTTP request.
+        channel.writeAndFlush(request);
+      }
+      // Wait for the server to close the connection. throw exception if failed
+      channel.closeFuture().syncUninterruptibly();
+
+      if (!state.equals(FetcherState.FETCH_META_FINISHED)) {
+        endFetch(FetcherState.FETCH_FAILED);
+      } else {
+        state = FetcherState.FETCH_DATA_FETCHING;
+        fileLen = fileNum = 0;
+        for (FileChunkMeta eachMeta : chunkMetas) {
+          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachMeta.getTaskId(), "output");
+          if (!localDirAllocator.ifExists(outputPath.toString(), conf)) {
+            LOG.warn("Range shuffle - file not exist. " + outputPath);
+            continue;
+          }
+          Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(outputPath.toString(), conf));
+          File file = new File(URI.create(path.toUri() + "/output"));
+          FileChunk chunk = new FileChunk(file, eachMeta.getStartOffset(), eachMeta.getLength());
+          chunk.setEbId(tableName);
+          fileChunks.add(chunk);
+          fileLen += chunk.length();
+          fileNum++;
+        }
+        endFetch(FetcherState.FETCH_DATA_FINISHED);
+      }
+
+      return fileChunks;
+    } finally {
+      if(future != null && future.channel().isOpen()){
+        // Close the channel to exit.
+        future.channel().close().awaitUninterruptibly();
+      }
+    }
+  }
+
+  public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+    private int length = -1;
+    private int totalReceivedContentLength = 0;
+    private byte[] buf;
+    private final Gson gson = new Gson();
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      messageReceiveCount++;
+      if (msg instanceof HttpResponse) {
+        try {
+          HttpResponse response = (HttpResponse) msg;
+
+          StringBuilder sb = new StringBuilder();
+          if (LOG.isDebugEnabled()) {
+            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+                .append(response.getProtocolVersion()).append(", HEADER: ");
+          }
+          if (!response.headers().names().isEmpty()) {
+            for (String name : response.headers().names()) {
+              for (String value : response.headers().getAll(name)) {
+                if (LOG.isDebugEnabled()) {
+                  sb.append(name).append(" = ").append(value);
+                }
+                if (this.length == -1 && name.equals("Content-Length")) {
+                  this.length = Integer.parseInt(value);
+                  if (buf == null || buf.length < this.length) {
+                    buf = new byte[this.length];
+                  }
+                }
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sb.toString());
+          }
+
+          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+            LOG.warn("There are no data corresponding to the request");
+            length = 0;
+            return;
+          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+            LOG.error(response.getStatus().reasonPhrase());
+            state = TajoProtos.FetcherState.FETCH_FAILED;
+            return;
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+
+      if (msg instanceof HttpContent) {
+        HttpContent httpContent = (HttpContent) msg;
+        ByteBuf content = httpContent.content();
+
+        if (state != FetcherState.FETCH_FAILED) {
+          try {
+            if (content.isReadable()) {
+              int contentLength = content.readableBytes();
+              content.readBytes(buf, totalReceivedContentLength, contentLength);
+              totalReceivedContentLength += contentLength;
+            }
+
+            if (msg instanceof LastHttpContent) {
+              if (totalReceivedContentLength == length) {
+                state = FetcherState.FETCH_META_FINISHED;
+
+                List<String> jsonMetas = gson.fromJson(new String(buf), List.class);
+                for (String eachJson : jsonMetas) {
+                  FileChunkMeta meta = gson.fromJson(eachJson, FileChunkMeta.class);
+                  chunkMetas.add(meta);
+                }
+                totalReceivedContentLength = 0;
+                length = -1;
+              } else {
+                endFetch(FetcherState.FETCH_FAILED);
+                throw new IOException("Invalid fetch meta length: " + totalReceivedContentLength + ", expected length: "
+                    + length);
+              }
+            }
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          } finally {
+            ReferenceCountUtil.release(msg);
+          }
+        } else {
+          // http content contains the reason why the fetch failed.
+          LOG.error(content.toString(Charset.defaultCharset()));
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      if (cause instanceof ReadTimeoutException) {
+        LOG.warn(cause.getMessage(), cause);
+      } else {
+        LOG.error("Fetch failed :", cause);
+      }
+
+      // this fetching will be retry
+      finishTime = System.currentTimeMillis();
+      state = TajoProtos.FetcherState.FETCH_FAILED;
+      ctx.close();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      if(getState() == FetcherState.FETCH_INIT || getState() == FetcherState.FETCH_META_FETCHING){
+        //channel is closed, but cannot complete fetcher
+        finishTime = System.currentTimeMillis();
+        LOG.error("Channel closed by peer: " + ctx.channel());
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+      }
+
+      super.channelUnregistered(ctx);
+    }
+  }
+
+  public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
+
+      int maxChunkSize = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+      int readTimeout = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+      pipeline.addLast("handler", new HttpClientHandler());
+    }
+  }
+
+  private List<URI> createChunkMetaRequestURIs(String pullServerAddr, int pullServerPort, PullServerParams params) {
+    final PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder(pullServerAddr, pullServerPort, maxUrlLength);
+    builder.setRequestType(PullServerConstants.META_REQUEST_PARAM_STRING)
+        .setQueryId(params.queryId())
+        .setShuffleType(params.shuffleType())
+        .setEbId(params.ebId())
+        .setPartId(params.partId());
+
+    if (params.contains(Param.OFFSET)) {
+      builder.setOffset(params.offset()).setLength(params.length());
+    }
+
+    if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+      builder.setStartKeyBase64(params.startKey())
+          .setEndKeyBase64(params.endKey())
+          .setLastInclude(params.last());
+    }
+
+    if (params.contains(Param.TASK_ID)) {
+      builder.setTaskAttemptIds(params.taskAttemptIds());
+    }
+    return builder.build(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
new file mode 100644
index 0000000..9a8a70b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NettyUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * RemoteFetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class RemoteFetcher extends AbstractFetcher {
+
+  private final static Log LOG = LogFactory.getLog(RemoteFetcher.class);
+
+  private final String host;
+  private int port;
+
+  private final Bootstrap bootstrap;
+  private final List<Long> chunkLengths = new ArrayList<>();
+
+  public RemoteFetcher(TajoConf conf, URI uri, FileChunk chunk) {
+    super(conf, uri, chunk);
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+
+    bootstrap = new Bootstrap()
+        .group(
+            NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+                conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+            conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+        .option(ChannelOption.TCP_NODELAY, true);
+  }
+
+  @Override
+  public List<FileChunk> get() throws IOException {
+    List<FileChunk> fileChunks = new ArrayList<>();
+
+    if (state == FetcherState.FETCH_INIT) {
+      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+      bootstrap.handler(initializer);
+    }
+
+    this.startTime = System.currentTimeMillis();
+    this.state = FetcherState.FETCH_DATA_FETCHING;
+    ChannelFuture future = null;
+    try {
+      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+      // Wait until the connection attempt succeeds or fails.
+      Channel channel = future.awaitUninterruptibly().channel();
+      if (!future.isSuccess()) {
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+        throw new IOException(future.cause());
+      }
+
+      String query = uri.getPath()
+          + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+      // Prepare the HTTP request.
+      HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+      request.headers().set(HttpHeaders.Names.HOST, host);
+      request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Status: " + getState() + ", URI:" + uri);
+      }
+      // Send the HTTP request.
+      channel.writeAndFlush(request);
+
+      // Wait for the server to close the connection. throw exception if failed
+      channel.closeFuture().syncUninterruptibly();
+
+      fileChunk.setLength(fileChunk.getFile().length());
+
+      long start = 0;
+      for (Long eachChunkLength : chunkLengths) {
+        if (eachChunkLength == 0) continue;
+        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+        chunk.setEbId(fileChunk.getEbId());
+        chunk.setFromRemote(true);
+        fileChunks.add(chunk);
+        start += eachChunkLength;
+      }
+      return fileChunks;
+
+    } finally {
+      if(future != null && future.channel().isOpen()){
+        // Close the channel to exit.
+        future.channel().close().awaitUninterruptibly();
+      }
+
+      this.finishTime = System.currentTimeMillis();
+      long elapsedMills = finishTime - startTime;
+      String transferSpeed;
+      if(elapsedMills > 1000) {
+        long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
+        transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
+      } else {
+        transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
+      }
+
+      LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
+          elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
+    }
+  }
+
+  public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+    private final File file;
+    private RandomAccessFile raf;
+    private FileChannel fc;
+    private long length = -1;
+    private int totalReceivedContentLength = 0;
+
+    public HttpClientHandler(File file) throws FileNotFoundException {
+      this.file = file;
+      this.raf = new RandomAccessFile(file, "rw");
+      this.fc = raf.getChannel();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      messageReceiveCount++;
+      if (msg instanceof HttpResponse) {
+        try {
+          HttpResponse response = (HttpResponse) msg;
+
+          StringBuilder sb = new StringBuilder();
+          if (LOG.isDebugEnabled()) {
+            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+                .append(response.getProtocolVersion()).append(", HEADER: ");
+          }
+          if (!response.headers().names().isEmpty()) {
+            for (String name : response.headers().names()) {
+              for (String value : response.headers().getAll(name)) {
+                if (LOG.isDebugEnabled()) {
+                  sb.append(name).append(" = ").append(value);
+                }
+                if (this.length == -1 && name.equals("Content-Length")) {
+                  this.length = Long.parseLong(value);
+                }
+              }
+            }
+            if (response.headers().contains(PullServerConstants.CHUNK_LENGTH_HEADER_NAME)) {
+              String stringOffset = response.headers().get(PullServerConstants.CHUNK_LENGTH_HEADER_NAME);
+
+              for (String eachSplit : stringOffset.split(",")) {
+                chunkLengths.add(Long.parseLong(eachSplit));
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sb.toString());
+          }
+
+          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+            LOG.warn("There are no data corresponding to the request");
+            length = 0;
+            return;
+          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+            LOG.error(response.getStatus().reasonPhrase(), response.getDecoderResult().cause());
+            endFetch(FetcherState.FETCH_FAILED);
+            return;
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+
+      if (msg instanceof HttpContent) {
+        HttpContent httpContent = (HttpContent) msg;
+        ByteBuf content = httpContent.content();
+
+        if (state != FetcherState.FETCH_FAILED) {
+          try {
+            if (content.isReadable()) {
+              totalReceivedContentLength += content.readableBytes();
+              content.readBytes(fc, content.readableBytes());
+            }
+
+            if (msg instanceof LastHttpContent) {
+              if (raf != null) {
+                fileLen = file.length();
+                fileNum = 1;
+              }
+
+              if (totalReceivedContentLength == length) {
+                endFetch(FetcherState.FETCH_DATA_FINISHED);
+              } else {
+                endFetch(FetcherState.FETCH_FAILED);
+                throw new IOException("Invalid fetch length: " + totalReceivedContentLength + ", but expected " + length);
+              }
+              IOUtils.cleanup(LOG, fc, raf);
+            }
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          } finally {
+            ReferenceCountUtil.release(msg);
+          }
+        } else {
+          // http content contains the reason why the fetch failed.
+          LOG.error(content.toString(Charset.defaultCharset()));
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      if (cause instanceof ReadTimeoutException) {
+        LOG.warn(cause.getMessage(), cause);
+      } else {
+        LOG.error("Fetch failed :", cause);
+      }
+
+      // this fetching will be retry
+      IOUtils.cleanup(LOG, fc, raf);
+      endFetch(FetcherState.FETCH_FAILED);
+      ctx.close();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      if(getState() != FetcherState.FETCH_DATA_FINISHED){
+        //channel is closed, but cannot complete fetcher
+        endFetch(FetcherState.FETCH_FAILED);
+        LOG.error("Channel closed by peer: " + ctx.channel());
+      }
+      IOUtils.cleanup(LOG, fc, raf);
+
+      super.channelUnregistered(ctx);
+    }
+  }
+
+  public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+    private final File file;
+
+    public HttpClientChannelInitializer(File file) {
+      this.file = file;
+    }
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
+
+      int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+      int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+      pipeline.addLast("handler", new HttpClientHandler(file));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index de337ae..6296bb0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.metrics.Node;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.pullserver.PullServerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.QueryMaster;
 import org.apache.tajo.querymaster.QueryMasterManagerService;
@@ -64,7 +65,6 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
@@ -144,7 +144,12 @@ public class TajoWorker extends CompositeService {
     queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
     addIfService(queryMasterManagerService);
 
-    this.taskManager = new TaskManager(dispatcher, workerContext);
+    if (!PullServerUtil.useExternalPullServerService(systemConf)) {
+      pullService = new TajoPullServerService();
+      addIfService(pullService);
+    }
+
+    this.taskManager = new TaskManager(dispatcher, workerContext, pullService);
     addService(taskManager);
 
     this.taskExecutor = new TaskExecutor(workerContext);
@@ -158,21 +163,16 @@ public class TajoWorker extends CompositeService {
     addService(new NodeStatusUpdater(workerContext));
 
     int httpPort = 0;
-    if(!TajoPullServerService.isStandalone()) {
-      pullService = new TajoPullServerService();
-      addIfService(pullService);
-    }
-
     if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) {
       httpPort = initWebServer();
     }
 
     super.serviceInit(conf);
 
-    int pullServerPort;
+    int pullServerPort = systemConf.getIntVar(ConfVars.PULLSERVER_PORT);
     if(pullService != null){
       pullServerPort = pullService.getPort();
-    } else {
+    } else if (TajoPullServerService.isStandalone()) {
       pullServerPort = getStandAlonePullServerPort();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 19d5da4..4a5b0b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -54,5 +54,5 @@ public interface Task {
 
   TaskHistory createTaskHistory();
 
-  List<Fetcher> getFetchers();
+  List<AbstractFetcher> getFetchers();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 55eb02a..920dfe5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
@@ -53,7 +54,6 @@ import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.rpc.NullCallback;
@@ -84,7 +84,7 @@ public class TaskImpl implements Task {
   private final Path taskDir;
 
   private final TaskAttemptContext context;
-  private List<Fetcher> fetcherRunners;
+  private List<AbstractFetcher> fetcherRunners;
   private LogicalNode plan;
   private PhysicalExec executor;
 
@@ -269,7 +269,7 @@ public class TaskImpl implements Task {
       return taskIdStr1.compareTo(taskIdStr2);
     });
 
-    for (Fetcher f : fetcherRunners) {
+    for (AbstractFetcher f : fetcherRunners) {
       fetcherExecutor.submit(new FetchRunner(context, f));
     }
   }
@@ -516,14 +516,14 @@ public class TaskImpl implements Task {
         taskHistory.setTotalFetchCount(fetcherRunners.size());
         int i = 0;
         FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
-        for (Fetcher fetcher : fetcherRunners) {
+        for (AbstractFetcher fetcher : fetcherRunners) {
           builder.setStartTime(fetcher.getStartTime());
           builder.setFinishTime(fetcher.getFinishTime());
           builder.setFileLength(fetcher.getFileLen());
           builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
           builder.setState(fetcher.getState());
           taskHistory.addFetcherHistory(builder.build());
-          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+          if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) i++;
         }
         taskHistory.setFinishedFetchCount(i);
       }
@@ -534,7 +534,7 @@ public class TaskImpl implements Task {
     return taskHistory;
   }
 
-  public List<Fetcher> getFetchers() {
+  public List<AbstractFetcher> getFetchers() {
     return fetcherRunners;
   }
 
@@ -585,10 +585,10 @@ public class TaskImpl implements Task {
 
   private class FetchRunner implements Runnable {
     private final TaskAttemptContext ctx;
-    private final Fetcher fetcher;
+    private final AbstractFetcher fetcher;
     private int maxRetryNum;
 
-    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+    public FetchRunner(TaskAttemptContext ctx, AbstractFetcher fetcher) {
       this.ctx = ctx;
       this.fetcher = fetcher;
       this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
@@ -612,7 +612,7 @@ public class TaskImpl implements Task {
           }
           try {
             List<FileChunk> fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+            if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) {
               for (FileChunk eachFetch : fetched) {
                 if (eachFetch.getFile() != null) {
                   if (!eachFetch.fromRemote()) {
@@ -630,7 +630,7 @@ public class TaskImpl implements Task {
           retryNum++;
         }
       } finally {
-        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+        if(fetcher.getState() == FetcherState.FETCH_DATA_FINISHED){
           fetcherFinished(ctx);
         } else {
           if (retryNum == maxRetryNum) {
@@ -669,8 +669,8 @@ public class TaskImpl implements Task {
     }
   }
 
-  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchProto> fetches) throws IOException {
+  private List<AbstractFetcher> getFetchRunners(TaskAttemptContext ctx,
+                                                List<FetchProto> fetches) throws IOException {
 
     if (fetches.size() > 0) {
       Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -681,7 +681,7 @@ public class TaskImpl implements Task {
       File storeDir;
       File defaultStoreFile;
       List<FileChunk> storeChunkList = new ArrayList<>();
-      List<Fetcher> runnerList = Lists.newArrayList();
+      List<AbstractFetcher> runnerList = Lists.newArrayList();
 
       for (FetchProto f : fetches) {
         storeDir = new File(inputDir.toString(), f.getName());
@@ -696,43 +696,16 @@ public class TaskImpl implements Task {
 
           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-
-            List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
-
-            for (FileChunk localChunk : localChunkCandidates) {
-              // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-              // So, we should skip and don't need to create storeChunk.
-              if (localChunk == null || localChunk.length() == 0) {
-                continue;
-              }
-
-              if (localChunk.getFile() != null && localChunk.startOffset() > -1) {
-                localChunk.setFromRemote(false);
-                localStoreChunkCount++;
-              } else {
-                localChunk = new FileChunk(defaultStoreFile, 0, -1);
-                localChunk.setFromRemote(true);
-              }
-              localChunk.setEbId(f.getName());
-              storeChunkList.add(localChunk);
-            }
-
+            localStoreChunkCount++;
+            runnerList.add(new LocalFetcher(systemConf, uri, executionBlockContext, f.getName()));
           } else {
+            // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+            // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
             FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
             remoteChunk.setFromRemote(true);
             remoteChunk.setEbId(f.getName());
-            storeChunkList.add(remoteChunk);
-          }
-
-          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
-          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-          for (FileChunk eachChunk : storeChunkList) {
-            Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
-            runnerList.add(fetcher);
+            runnerList.add(new RemoteFetcher(systemConf, uri, remoteChunk));
             i++;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
-            }
           }
         }
       }
@@ -745,96 +718,9 @@ public class TaskImpl implements Task {
     }
   }
 
-  private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
-    // Parse the URI
-
-    // Parsing the URL into key-values
-    final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
-
-    String partId = params.get("p").get(0);
-    String queryId = params.get("qid").get(0);
-    String shuffleType = params.get("type").get(0);
-    String sid =  params.get("sid").get(0);
-
-    final List<String> taskIdList = params.get("ta");
-    final List<String> offsetList = params.get("offset");
-    final List<String> lengthList = params.get("length");
-
-    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-          + ", taskIds=" + taskIdList);
-    }
-
-    // The working directory of Tajo worker for each query, including stage
-    Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
-
-    List<FileChunk> chunkList = new ArrayList<>();
-    // If the stage requires a range shuffle
-    if (shuffleType.equals("r")) {
-
-      final String startKey = params.get("start").get(0);
-      final String endKey = params.get("end").get(0);
-      final boolean last = params.get("final") != null;
-      final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
-
-      long before = System.currentTimeMillis();
-      for (String eachTaskId : taskIds) {
-        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
-        if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
-          LOG.warn("Range shuffle - file not exist. " + outputPath);
-          continue;
-        }
-        Path path = executionBlockContext.getLocalFS().makeQualified(
-            executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
-
-        try {
-          FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
-          chunkList.add(chunk);
-        } catch (Throwable t) {
-          LOG.error(t.getMessage(), t);
-          throw new IOException(t.getCause());
-        }
-      }
-      long after = System.currentTimeMillis();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Index lookup time: " + (after - before) + " ms");
-      }
-
-      // If the stage requires a hash shuffle or a scattered hash shuffle
-    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-      Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
-
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
-        throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
-      File file = new File(path.toUri());
-      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
-      if (startPos >= file.length()) {
-        throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-      }
-      FileChunk chunk = new FileChunk(file, startPos, readLen);
-      chunkList.add(chunk);
-
-    } else {
-      throw new IOException("Unknown shuffle type");
-    }
-
-    return chunkList;
-  }
-
   public static Path getTaskAttemptDir(TaskAttemptId quid) {
-    Path workDir =
-        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-            String.valueOf(quid.getTaskId().getId()),
-            String.valueOf(quid.getId()));
-    return workDir;
+    return StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+        String.valueOf(quid.getTaskId().getId()),
+        String.valueOf(quid.getId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 61174b3..efdda9a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -30,6 +30,7 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.RpcClientManager;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.tajo.ResourceProtos.*;
@@ -57,12 +59,19 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
   private final Dispatcher dispatcher;
   private TaskExecutor executor;
   private final Properties rpcParams;
+  private final TajoPullServerService pullServerService;
 
-  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){
-    this(dispatcher, workerContext, null);
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+    this(dispatcher, workerContext, null, null);
   }
 
-  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor) {
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext,
+                     TajoPullServerService pullServerService) {
+    this(dispatcher, workerContext, null, pullServerService);
+  }
+
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor,
+                     TajoPullServerService pullServerService) {
     super(TaskManager.class.getName());
 
     this.dispatcher = dispatcher;
@@ -70,6 +79,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
     this.executionBlockContextMap = Maps.newHashMap();
     this.executor = executor;
     this.rpcParams = RpcParameterFactory.get(this.workerContext.getConf());
+    this.pullServerService = pullServerService;
   }
 
   @Override
@@ -124,7 +134,8 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
       stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
 
       ExecutionBlockContextResponse contextProto = callback.get();
-      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client);
+      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client,
+          pullServerService);
 
       context.init();
       return context;

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index 88b7c24..3eab5cd 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.Fetcher" %>
+<%@ page import="org.apache.tajo.worker.AbstractFetcher" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="org.apache.tajo.worker.Task" %>
 <%@ page import="org.apache.tajo.worker.TaskHistory" %>
@@ -161,7 +161,7 @@
             <th>URI</th>
         </tr>
         <%
-            for (Fetcher eachFetcher : task.getFetchers()) {
+            for (AbstractFetcher eachFetcher : task.getFetchers()) {
         %>
         <tr>
             <td><%=index%>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index 652ab84..7280e1f 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -160,6 +160,9 @@
                       run mkdir -p share/jdbc-dist
                       run cp -r $ROOT/tajo-jdbc/target/tajo-jdbc-${project.version}-jar-with-dependencies.jar ./share/jdbc-dist/tajo-jdbc-${project.version}.jar
 
+                      run mkdir -p share/yarn-dist
+                      run cp -r $ROOT/tajo-yarn/target/tajo-yarn-${project.version}-jar-with-dependencies.jar ./share/yarn-dist/tajo-yarn-${project.version}.jar
+
                       run mkdir -p extlib
 
                       echo

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration.rst b/tajo-docs/src/main/sphinx/configuration.rst
index 8b0b8a2..bef49cb 100644
--- a/tajo-docs/src/main/sphinx/configuration.rst
+++ b/tajo-docs/src/main/sphinx/configuration.rst
@@ -10,6 +10,7 @@ Configuration
     configuration/tajo_master_configuration
     configuration/worker_configuration
     configuration/catalog_configuration
+    configuration/pullserver_configuration
     configuration/ha_configuration
     configuration/service_config_defaults
     configuration/tajo-site-xml

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
index e9d17eb..3835512 100644
--- a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
+++ b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
@@ -1,9 +1,9 @@
-*******************************************
+*************
 Cluster Setup
-*******************************************
+*************
 
 Fully Distributed Mode
-==========================================
+======================
 A fully distributed mode enables a Tajo instance to run on `Hadoop Distributed File System (HDFS) <http://wiki.apache.org/hadoop/HDFS>`_. In this mode, a number of Tajo workers run across a number of the physical nodes where HDFS data nodes run.
 
 
@@ -11,7 +11,7 @@ In this section, we explain how to setup the cluster mode.
 
 
 Settings
---------------------------------------------------------
+--------
 
 Please add the following configs to tajo-site.xml file:
 
@@ -43,7 +43,7 @@ Please add the following configs to tajo-site.xml file:
   </property>
 
 Workers
---------------------------------------------------------
+-------
 
 The file ``conf/workers`` lists all host names of workers, one per line.
 By default, this file contains the single entry ``localhost``.
@@ -59,7 +59,7 @@ For example: ::
   <ctrl + d>
 
 Make base directories and set permissions
---------------------------------------------------------
+-----------------------------------------
 
 If you want to know Tajo’s configuration in more detail, see Configuration page.
 Before launching the tajo, you should create the tajo root dir and set the permission as follows: ::
@@ -69,7 +69,7 @@ Before launching the tajo, you should create the tajo root dir and set the permi
 
 
 Launch a Tajo cluster
---------------------------------------------------------
+---------------------
 
 Then, execute ``start-tajo.sh`` ::
 
@@ -77,10 +77,10 @@ Then, execute ``start-tajo.sh`` ::
 
 .. note::
 
-  In default, each worker is set to very little resource capacity. In order to increase parallel degree, please read 
+  By default, each worker is set to very little resource capacity. In order to increase parallel degree, please read
   :doc:`/configuration/worker_configuration`.
 
 .. note::
 
-  In default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
+  By default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
new file mode 100644
index 0000000..68760fc
--- /dev/null
+++ b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
@@ -0,0 +1,75 @@
+*************************
+Pull Server Configuration
+*************************
+
+Pull servers are responsible for transmitting data among Tajo workers during shuffle phases. Tajo provides several modes
+for pull servers.
+
+Internal Mode (Default)
+=======================
+
+With the internal mode, each worker acts as a pull server. So, they need to transmit data during shuffle phase as well
+as processing them during processing phase.
+
+Standalone Mode
+===============
+
+Sometimes, data shuffling requires huge memory space and a lot of cpu processing.
+This can make query processing slow because Tajo's query engine should contend for limited resources with pull servers.
+Tajo provides the standalone mode to avoid this unnecessary contention.
+
+In this mode, each pull server is executed as a separate process. To enable this mode, you need to add the following
+line to ``${TAJO_HOME}/conf/tajo-env.sh``.
+
+.. code-block:: sh
+
+  export TAJO_PULLSERVER_STANDALONE=true
+
+Then, you can see the following messages when you start up the tajo cluster.
+
+.. code-block:: sh
+
+  Starting single TajoMaster
+  starting master, logging to ...
+  192.168.10.1: starting pullserver, logging to ...
+  192.168.10.1: starting worker, logging to ...
+  192.168.10.2: starting pullserver, logging to ...
+  192.168.10.2: starting worker, logging to ...
+  ...
+
+.. warning::
+
+  Currently, only one single server should be run in each machine.
+
+Yarn Auxiliary Service Mode
+===========================
+
+You can run pull servers as one of Yarn's auxiliary services. To do so, you need to add the following configurations
+to ``${HADOOP_CONF}/yarn-site.xml``.
+
+.. code-block:: xml
+
+  <property>
+    <name>yarn.nodemanager.aux-services</name>
+    <value>mapreduce_shuffle,tajo_shuffle</value>
+  </property>
+
+  <property>
+    <name>yarn.nodemanager.aux-services.tajo_shuffle.class</name>
+    <value>org.apache.tajo.yarn.TajoPullServerService</value>
+  </property>
+
+  <property>
+    <name>tajo.pullserver.port</name>
+    <value>port number</value>
+  </property>
+
+Optionally, you can add the below configuration to specify temp directories. For this configuration,
+please refer to :doc:`/configuration/worker_configuration`.
+
+.. code-block:: xml
+
+  <property>
+    <name>tajo.worker.tmpdir.locations</name>
+    <value>/path/to/tajo/temporal/directory</value>
+  </property>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index ee1317f..4818983 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -832,6 +832,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-yarn</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
         <artifactId>tajo-client</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -991,13 +996,6 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-server-tests</artifactId>
-        <version>${hadoop.version}</version>
-        <type>test-jar</type>
-        <scope>test</scope>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
         <version>${hadoop.version}</version>
         <exclusions>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 2a1a745..6c805ac 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -56,12 +56,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc-protobuf</artifactId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-catalog-common</artifactId>
-      <scope>provided</scope>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
index 643d9e0..07f0e56 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -151,6 +151,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
   protected void deallocate() {
     if (readaheadRequest != null) {
       readaheadRequest.cancel();
+      readaheadRequest = null;
     }
     super.deallocate();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
index 9c3c523..7f97542 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -24,18 +24,9 @@ import io.netty.util.concurrent.GenericFutureListener;
 public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
 
   private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
 
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
+  public FileCloseListener(FadvisedFileRegion filePart) {
     this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
   }
 
   // TODO error handling; distinguish IO/connection failures,
@@ -46,8 +37,5 @@ public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
       filePart.transferSuccessful();
     }
     filePart.deallocate();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
new file mode 100644
index 0000000..74f96e7
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+public class PullServerConstants {
+
+  /**
+   * Pull server query parameters
+   */
+  public enum Param {
+    // Common params
+    REQUEST_TYPE("rtype"),  // can be one of 'm' for meta and 'c' for chunk.
+    SHUFFLE_TYPE("stype"),  // can be one of 'r', 'h', and 's'.
+    QUERY_ID("qid"),
+    EB_ID("sid"),
+    PART_ID("p"),
+    TASK_ID("ta"),
+    OFFSET("offset"),
+    LENGTH("length"),
+
+    // Range shuffle params
+    START("start"),
+    END("end"),
+    FINAL("final");
+
+    private String key;
+
+    Param(String key) {
+      this.key = key;
+    }
+
+    public String key() {
+      return key;
+    }
+  }
+
+  // Request types ----------------------------------------------------------
+
+  public static final String CHUNK_REQUEST_PARAM_STRING = "c";
+  public static final String META_REQUEST_PARAM_STRING = "m";
+
+  // Shuffle types ----------------------------------------------------------
+
+  public static final String RANGE_SHUFFLE_PARAM_STRING = "r";
+  public static final String HASH_SHUFFLE_PARAM_STRING = "h";
+  public static final String SCATTERED_HASH_SHUFFLE_PARAM_STRING = "s";
+
+  // HTTP header ------------------------------------------------------------
+
+  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+  // SSL configurations -----------------------------------------------------
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+      "tajo.pullserver.ssl.file.buffer.size";
+
+  // OS cache configurations ------------------------------------------------
+
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  // Prefetch configurations ------------------------------------------------
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+  // Yarn service ID --------------------------------------------------------
+
+  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+  // Standalone pull server -------------------------------------------------
+  public static final String PULLSERVER_STANDALONE_ENV_KEY = "TAJO_PULLSERVER_STANDALONE";
+
+  public static final String PULLSERVER_SERVICE_NAME = "httpshuffle";
+}