You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:09 UTC

[2/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
index c90f1aa..75f6080 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -18,22 +18,47 @@
 
 package org.apache.tajo.pullserver;
 
-import org.apache.commons.lang.reflect.MethodUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.LoadingCache;
+import com.google.gson.Gson;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.Pair;
 
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
+import java.io.*;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class PullServerUtil {
   private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
 
   private static boolean nativeIOPossible = false;
-  private static Method posixFadviseIfPossible;
 
   static {
-    if (NativeIO.isAvailable() && loadNativeIO()) {
+    if (NativeIO.isAvailable()) {
       nativeIOPossible = true;
     } else {
       LOG.warn("Unable to load hadoop nativeIO");
@@ -53,7 +78,7 @@ public class PullServerUtil {
                                             long offset, long len, int flags) {
     if (nativeIOPossible) {
       try {
-        posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, offset, len, flags);
       } catch (Throwable t) {
         nativeIOPossible = false;
         LOG.warn("Failed to manage OS cache for " + identifier, t);
@@ -61,30 +86,643 @@ public class PullServerUtil {
     }
   }
 
-  /* load hadoop native method if possible */
-  private static boolean loadNativeIO() {
-    boolean loaded = true;
-    if (nativeIOPossible) return loaded;
+  public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+    return StorageUtil.concatPath(
+            queryId,
+            "output",
+            executionBlockSequenceId);
+  }
 
-    Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
-    try {
-      Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
-      Class posixClass;
-      if (getCacheManipulator != null) {
-        Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
-        posixClass = posix.getClass();
+  public static Path getBaseInputDir(String queryId, String executionBlockId) {
+    return StorageUtil.concatPath(
+            queryId,
+            "in",
+            executionBlockId);
+  }
+
+  public static List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+
+  public static boolean isChunkRequest(String requestType) {
+    return requestType.equals(PullServerConstants.CHUNK_REQUEST_PARAM_STRING);
+  }
+
+  public static boolean isMetaRequest(String requestType) {
+    return requestType.equals(PullServerConstants.META_REQUEST_PARAM_STRING);
+  }
+
+  public static boolean isRangeShuffle(String shuffleType) {
+    return shuffleType.equals(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+  }
+
+  public static boolean isHashShuffle(String shuffleType) {
+    return shuffleType.equals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        || shuffleType.equals(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
+  }
+
+  public static class PullServerParams extends HashMap<String, List<String>> {
+
+    public PullServerParams(URI uri) {
+      this(uri.toString());
+    }
+
+    public PullServerParams(String uri) {
+      super(new QueryStringDecoder(uri).parameters());
+    }
+
+    public boolean contains(Param param) {
+      return containsKey(param.key());
+    }
+
+    public List<String> get(Param param) {
+      return get(param.key());
+    }
+
+    private String checkAndGetFirstParam(Param param) {
+      Preconditions.checkArgument(contains(param), "Missing " + param.name());
+      Preconditions.checkArgument(get(param).size() == 1, "Too many params: " + param.name());
+      return get(param).get(0);
+    }
+
+    private List<String> checkAndGet(Param param) {
+      Preconditions.checkArgument(contains(param), "Missing " + param.name());
+      return get(param);
+    }
+
+    public String requestType() {
+      return checkAndGetFirstParam(Param.REQUEST_TYPE);
+    }
+
+    public String shuffleType() {
+      return checkAndGetFirstParam(Param.SHUFFLE_TYPE);
+    }
+
+    public String queryId() {
+      return checkAndGetFirstParam(Param.QUERY_ID);
+    }
+
+    public String ebId() {
+      return checkAndGetFirstParam(Param.EB_ID);
+    }
+
+    public long offset() {
+      return contains(Param.OFFSET) && get(Param.OFFSET).size() == 1 ?
+          Long.parseLong(get(Param.OFFSET).get(0)) : -1L;
+    }
+
+    public long length() {
+      return contains(Param.LENGTH) && get(Param.LENGTH).size() == 1 ?
+          Long.parseLong(get(Param.LENGTH).get(0)) : -1L;
+    }
+
+    public String startKey() {
+      return checkAndGetFirstParam(Param.START);
+    }
+
+    public String endKey() {
+      return checkAndGetFirstParam(Param.END);
+    }
+
+    public boolean last() {
+      return contains(Param.FINAL);
+    }
+
+    public String partId() {
+      return checkAndGetFirstParam(Param.PART_ID);
+    }
+
+    public List<String> taskAttemptIds() {
+      return checkAndGet(Param.TASK_ID);
+    }
+  }
+
+  public static class PullServerRequestURIBuilder {
+    private final StringBuilder builder = new StringBuilder("http://");
+    private String requestType;
+    private String shuffleType;
+    private String queryId;
+    private Integer ebId;
+    private Integer partId;
+    private List<Integer> taskIds;
+    private List<Integer> attemptIds;
+    private List<String> taskAttemptIds;
+    private Long offset;
+    private Long length;
+    private String startKeyBase64;
+    private String endKeyBase64;
+    private boolean last;
+    private final int maxUrlLength;
+
+    public PullServerRequestURIBuilder(String pullServerAddr, int pullServerPort, int maxUrlLength) {
+      this(pullServerAddr, Integer.toString(pullServerPort), maxUrlLength);
+    }
+
+    public PullServerRequestURIBuilder(String pullServerAddr, String pullServerPort, int maxUrlLength) {
+      builder.append(pullServerAddr).append(":").append(pullServerPort).append("/?");
+      this.maxUrlLength = maxUrlLength;
+    }
+
+    public List<URI> build(boolean includeTasks) {
+      append(Param.REQUEST_TYPE, requestType)
+          .append(Param.QUERY_ID, queryId)
+          .append(Param.EB_ID, ebId)
+          .append(Param.PART_ID, partId)
+          .append(Param.SHUFFLE_TYPE, shuffleType);
+
+      if (startKeyBase64 != null) {
+
+        try {
+          append(Param.START, URLEncoder.encode(startKeyBase64, "utf-8"))
+              .append(Param.END, URLEncoder.encode(endKeyBase64, "utf-8"));
+        } catch (UnsupportedEncodingException e) {
+          throw new RuntimeException(e);
+        }
+
+        if (last) {
+          append(Param.FINAL, Boolean.toString(last));
+        }
+      }
+
+      if (length != null) {
+        append(Param.OFFSET, offset.toString())
+            .append(Param.LENGTH, length.toString());
+      }
+
+      List<URI> results = new ArrayList<>();
+      if (!includeTasks || isHashShuffle(shuffleType)) {
+        results.add(URI.create(builder.toString()));
       } else {
-        posixClass = NativeIO.POSIX.class;
+        builder.append(Param.TASK_ID.key()).append("=");
+        List<String> taskAttemptIds = this.taskAttemptIds;
+        if (taskAttemptIds == null) {
+
+          // Sort task ids to increase cache hit in pull server
+          taskAttemptIds = IntStream.range(0, taskIds.size())
+              .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+              .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+              // In the case of hash shuffle each partition has single shuffle file per worker.
+              // TODO If file is large, consider multiple fetching(shuffle file can be split)
+              .filter(pair -> pair.getFirst() >= 0)
+              .map(pair -> pair.getFirst() + "_" + pair.getSecond())
+              .collect(Collectors.toList());
+        }
+
+        // If the get request is longer than 2000 characters,
+        // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+        // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+        // The below code transforms a long request to multiple requests.
+        List<String> taskIdsParams = new ArrayList<>();
+        StringBuilder taskIdListBuilder = new StringBuilder();
+
+        boolean first = true;
+        for (int i = 0; i < taskAttemptIds.size(); i++) {
+          if (!first) {
+            taskIdListBuilder.append(",");
+          }
+          first = false;
+
+          if (builder.length() + taskIdListBuilder.length() > maxUrlLength) {
+            taskIdsParams.add(taskIdListBuilder.toString());
+            taskIdListBuilder = new StringBuilder(taskAttemptIds.get(i));
+          } else {
+            taskIdListBuilder.append(taskAttemptIds.get(i));
+          }
+        }
+        // if the url params remain
+        if (taskIdListBuilder.length() > 0) {
+          taskIdsParams.add(taskIdListBuilder.toString());
+        }
+        for (String param : taskIdsParams) {
+          results.add(URI.create(builder + param));
+        }
+      }
+
+      return results;
+    }
+
+    private PullServerRequestURIBuilder append(Param key, Object val) {
+      builder.append(key.key())
+          .append("=")
+          .append(val)
+          .append("&");
+
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setRequestType(String type) {
+      this.requestType = type;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setShuffleType(String shuffleType) {
+      this.shuffleType = shuffleType;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setQueryId(String queryId) {
+      this.queryId = queryId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEbId(String ebId) {
+      this.ebId = Integer.parseInt(ebId);
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEbId(Integer ebId) {
+      this.ebId = ebId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setPartId(String partId) {
+      this.partId = Integer.parseInt(partId);
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setPartId(Integer partId) {
+      this.partId = partId;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setTaskIds(List<Integer> taskIds) {
+      this.taskIds = taskIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setAttemptIds(List<Integer> attemptIds) {
+      this.attemptIds = attemptIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setTaskAttemptIds(List<String> taskAttemptIds) {
+      this.taskAttemptIds = taskAttemptIds;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setOffset(long offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setLength(long length) {
+      this.length = length;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setStartKeyBase64(String startKeyBase64) {
+      this.startKeyBase64 = startKeyBase64;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setEndKeyBase64(String endKeyBase64) {
+      this.endKeyBase64 = endKeyBase64;
+      return this;
+    }
+
+    public PullServerRequestURIBuilder setLastInclude(boolean last) {
+      this.last = last;
+      return this;
+    }
+  }
+
+  public static boolean useExternalPullServerService(TajoConf conf) {
+    // TODO: add more service types like mesos
+    return TajoPullServerService.isStandalone()
+        || conf.getBoolVar(ConfVars.YARN_SHUFFLE_SERVICE_ENABLED);
+  }
+
+  private static FileChunkMeta searchFileChunkMeta(String queryId,
+                                                  String ebSeqId,
+                                                  String taskId,
+                                                  Path outDir,
+                                                  String startKey,
+                                                  String endKey,
+                                                  boolean last,
+                                                  LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                                  int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+    SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+        indexReaderCache, lowCacheHitCheckThreshold);
+    // Do not send file chunks of 0 length
+    if (result != null) {
+      long startOffset = result.startOffset;
+      long endOffset = result.endOffset;
+
+      FileChunkMeta chunk = new FileChunkMeta(startOffset, endOffset - startOffset, ebSeqId, taskId);
+
+      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+      return chunk;
+    } else {
+      return null;
+    }
+  }
+
+  private static FileChunk searchFileChunk(String queryId,
+                                           String ebSeqId,
+                                           Path outDir,
+                                           String startKey,
+                                           String endKey,
+                                           boolean last,
+                                           LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                           int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+
+    final SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+        indexReaderCache, lowCacheHitCheckThreshold);
+    if (result != null) {
+      long startOffset = result.startOffset;
+      long endOffset = result.endOffset;
+      FileChunk chunk = new FileChunk(result.data, startOffset, endOffset - startOffset);
+
+      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+      return chunk;
+    } else {
+      return null;
+    }
+  }
+
+  private static class SearchResult {
+    File data;
+    long startOffset;
+    long endOffset;
+
+    public SearchResult(File data, long startOffset, long endOffset) {
+      this.data = data;
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+  }
+
+  private static SearchResult searchCorrespondPart(String queryId,
+                                                   String ebSeqId,
+                                                   Path outDir,
+                                                   String startKey,
+                                                   String endKey,
+                                                   boolean last,
+                                                   LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                                   int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+    BSTIndexReader idxReader = indexReaderCache.get(new IndexCacheKey(outDir, queryId, ebSeqId));
+    idxReader.retain();
+
+    File data;
+    long startOffset;
+    long endOffset;
+    try {
+      if (LOG.isDebugEnabled()) {
+        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+        }
+      }
+
+      Tuple indexedFirst = idxReader.getFirstKey();
+      Tuple indexedLast = idxReader.getLastKey();
+
+      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no contents");
+        }
+        return null;
+      }
+
+      byte[] startBytes = Base64.decodeBase64(startKey);
+      byte[] endBytes = Base64.decodeBase64(endKey);
+
+
+      Tuple start;
+      Tuple end;
+      Schema keySchema = idxReader.getKeySchema();
+      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
+      try {
+        start = decoder.toTuple(startBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("StartKey: " + startKey
+            + ", decoded byte size: " + startBytes.length, t);
+      }
+
+      try {
+        end = decoder.toTuple(endBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("EndKey: " + endKey
+            + ", decoded byte size: " + endBytes.length, t);
       }
-      posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
-    } catch (Throwable e) {
-      loaded = false;
-      LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage(), e);
+
+      data = new File(URI.create(outDir.toUri() + "/output"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+            (last ? ", last=true" : "") + ")");
+      }
+
+      TupleComparator comparator = idxReader.getComparator();
+
+      if (comparator.compare(end, indexedFirst) < 0 ||
+          comparator.compare(indexedLast, start) < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+              "], but request start:" + start + ", end: " + end);
+        }
+        return null;
+      }
+
+      try {
+        idxReader.init();
+        startOffset = idxReader.find(start);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+      try {
+        endOffset = idxReader.find(end);
+        if (endOffset == -1) {
+          endOffset = idxReader.find(end, true);
+        }
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+
+      // if startOffset == -1 then case 2-1 or case 3
+      if (startOffset == -1) { // this is a hack
+        // if case 2-1 or case 3
+        try {
+          startOffset = idxReader.find(start, true);
+        } catch (IOException ioe) {
+          LOG.error("State Dump (the requested range: "
+              + "[" + start + ", " + end + ")" + ", idx min: "
+              + idxReader.getFirstKey() + ", idx max: "
+              + idxReader.getLastKey());
+          throw ioe;
+        }
+      }
+
+      if (startOffset == -1) {
+        throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+            "State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+      }
+
+      // if greater than indexed values
+      if (last || (endOffset == -1
+          && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+        endOffset = data.length();
+      }
+    } finally {
+      idxReader.release();
     }
 
-    if (posixFadviseIfPossible == null) {
-      loaded = false;
+    return new SearchResult(data, startOffset, endOffset);
+  }
+
+  /**
+   * Retrieve meta information of file chunks which correspond to the requested URI.
+   * Only meta information for the file chunks which has non-zero length are retrieved.
+   *
+   * @param conf
+   * @param lDirAlloc
+   * @param localFS
+   * @param params
+   * @param gson
+   * @param indexReaderCache
+   * @param lowCacheHitCheckThreshold
+   * @return
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public static List<String> getJsonMeta(final TajoConf conf,
+                                         final LocalDirAllocator lDirAlloc,
+                                         final FileSystem localFS,
+                                         final PullServerParams params,
+                                         final Gson gson,
+                                         final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                         final int lowCacheHitCheckThreshold)
+      throws IOException, ExecutionException {
+    final List<String> taskIds = PullServerUtil.splitMaps(params.taskAttemptIds());
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+    final List<String> jsonMetas = new ArrayList<>();
+
+    for (String eachTaskId : taskIds) {
+      Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+      if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+        LOG.warn("Range shuffle - file not exist. " + outputPath);
+        continue;
+      }
+      Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+      FileChunkMeta meta;
+      meta = PullServerUtil.searchFileChunkMeta(params.queryId(), params.ebId(), eachTaskId, path,
+          params.startKey(), params.endKey(), params.last(), indexReaderCache, lowCacheHitCheckThreshold);
+      if (meta != null && meta.getLength() > 0) {
+        String jsonStr = gson.toJson(meta, FileChunkMeta.class);
+        jsonMetas.add(jsonStr);
+      }
+    }
+    return jsonMetas;
+  }
+
+  /**
+   * Retrieve file chunks which correspond to the requested URI.
+   * Only the file chunks which has non-zero length are retrieved.
+   *
+   * @param conf
+   * @param lDirAlloc
+   * @param localFS
+   * @param params
+   * @param indexReaderCache
+   * @param lowCacheHitCheckThreshold
+   * @return
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public static List<FileChunk> getFileChunks(final TajoConf conf,
+                                              final LocalDirAllocator lDirAlloc,
+                                              final FileSystem localFS,
+                                              final PullServerParams params,
+                                              final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+                                              final int lowCacheHitCheckThreshold)
+      throws IOException, ExecutionException {
+    final List<FileChunk> chunks = new ArrayList<>();
+
+    final String queryId = params.queryId();
+    final String shuffleType = params.shuffleType();
+    final String sid =  params.ebId();
+
+    final long offset = params.offset();
+    final long length = params.length();
+
+    final Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId, sid);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid);
+
+      // the working dir of tajo worker for each query
+      LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+    }
+
+    // if a stage requires a range shuffle
+    if (PullServerUtil.isRangeShuffle(shuffleType)) {
+      final List<String> taskIdList = params.taskAttemptIds();
+      final List<String> taskIds = PullServerUtil.splitMaps(taskIdList);
+
+      final String startKey = params.startKey();
+      final String endKey = params.endKey();
+      final boolean last = params.last();
+
+      long before = System.currentTimeMillis();
+      for (String eachTaskId : taskIds) {
+        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+        if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+          LOG.warn(outputPath + " does not exist.");
+          continue;
+        }
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+        FileChunk chunk = PullServerUtil.searchFileChunk(queryId, sid, path, startKey, endKey, last, indexReaderCache,
+            lowCacheHitCheckThreshold);
+        if (chunk != null) {
+          chunks.add(chunk);
+        }
+      }
+      long after = System.currentTimeMillis();
+      LOG.info("Index lookup time: " + (after - before) + " ms");
+
+      // if a stage requires a hash shuffle or a scattered hash shuffle
+    } else if (PullServerUtil.isHashShuffle(shuffleType)) {
+
+      final String partId = params.partId();
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+      Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+      if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
+        throw new FileNotFoundException(partPath.toString());
+      }
+
+      Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+        String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+        throw new EOFException(errorMessage);
+      }
+      FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunks.add(chunk);
+    } else {
+      throw new IllegalArgumentException(shuffleType);
     }
-    return loaded;
+    return chunks.stream().filter(c -> c.length() > 0).collect(Collectors.toList());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index 4609712..069e660 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -54,11 +54,6 @@ public class TajoPullServer extends CompositeService {
     start();
   }
 
-  public void start() {
-    super.start();
-
-  }
-
   public static void main(String[] args) throws Exception {
     StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index cbeba52..aa16f87 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -22,8 +22,9 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
-import com.google.common.collect.Lists;
+import com.google.gson.Gson;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.channel.group.ChannelGroup;
@@ -31,12 +32,13 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,42 +58,34 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
 import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.*;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 public class TajoPullServerService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
 
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
   private int port;
   private ServerBootstrap selector;
   private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@@ -99,7 +93,6 @@ public class TajoPullServerService extends AbstractService {
   private int sslFileBufferSize;
   private int maxUrlLength;
 
-  private ApplicationId appId;
   private FileSystem localFS;
 
   /**
@@ -110,62 +103,14 @@ public class TajoPullServerService extends AbstractService {
   private int readaheadLength;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
-  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
-  private static final Map<String,String> userRsrc =
-          new ConcurrentHashMap<>();
-  private String userName;
-
-  private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
-  private static int lowCacheHitCheckThreshold;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "tajo.pullserver.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+  private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+  private int lowCacheHitCheckThreshold;
 
   private static final boolean STANDALONE;
 
-  private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
-  private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
-
-  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
-
-  static class CacheKey {
-    private Path path;
-    private String queryId;
-    private String ebSeqId;
-
-    public CacheKey(Path path, String queryId, String ebSeqId) {
-      this.path = path;
-      this.queryId = queryId;
-      this.ebSeqId = ebSeqId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof CacheKey) {
-        CacheKey other = (CacheKey) o;
-        return Objects.equals(this.path, other.path)
-            && Objects.equals(this.queryId, other.queryId)
-            && Objects.equals(this.ebSeqId, other.ebSeqId);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(path, queryId, ebSeqId);
-    }
-  }
-
   static {
-    /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
-    SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
-    REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
-
-    String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
+    String standalone = System.getenv(PullServerConstants.PULLSERVER_STANDALONE_ENV_KEY);
+    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase(Boolean.TRUE.toString());
   }
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,7 +138,7 @@ public class TajoPullServerService extends AbstractService {
   final ShuffleMetrics metrics;
 
   TajoPullServerService(MetricsSystem ms) {
-    super("httpshuffle");
+    super(PullServerConstants.PULLSERVER_SERVICE_NAME);
     metrics = ms.register(new ShuffleMetrics());
   }
 
@@ -202,58 +147,33 @@ public class TajoPullServerService extends AbstractService {
     this(DefaultMetricsSystem.instance());
   }
 
-  public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
-    // TODO these bytes should be versioned
-    // TODO: Once SHuffle is out of NM, this can use MR APIs
-    this.appId = appId;
-    this.userName = user;
-    userRsrc.put(appId.toString(), user);
-  }
-
-  public void stopApp(ApplicationId appId) {
-    userRsrc.remove(appId.toString());
-  }
-
+  // TODO change AbstractService to throw InterruptedException
   @Override
-  public void init(Configuration conf) {
-    try {
-      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-          DEFAULT_SHUFFLE_READAHEAD_BYTES);
+  public void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+    TajoConf tajoConf = (TajoConf) conf;
 
-      int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
-          Runtime.getRuntime().availableProcessors() * 2);
+    manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+        PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
 
-      selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
-                   .option(ChannelOption.TCP_NODELAY, true)
-                   .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
-                   .childOption(ChannelOption.TCP_NODELAY, true);
+    readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+        PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
 
-      localFS = new LocalFileSystem();
+    int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
 
-      maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
-          ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+    selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
+        .option(ChannelOption.TCP_NODELAY, true)
+        .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+        .childOption(ChannelOption.TCP_NODELAY, true);
 
-      conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
-          , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
-      super.init(conf);
-      LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    }
-  }
+    localFS = new LocalFileSystem();
 
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
+    maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+    LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
 
     ServerBootstrap bootstrap = selector.clone();
-    TajoConf tajoConf = (TajoConf)conf;
     try {
       channelInitializer = new HttpChannelInitializer(tajoConf);
     } catch (Exception ex) {
@@ -263,7 +183,7 @@ public class TajoPullServerService extends AbstractService {
       .channel(NioServerSocketChannel.class);
 
     port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
-    ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+    ChannelFuture future = bootstrap.bind(port)
         .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
         .syncUninterruptibly();
 
@@ -272,8 +192,8 @@ public class TajoPullServerService extends AbstractService {
     tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
     LOG.info(getName() + " listening on port " + port);
 
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    sslFileBufferSize = conf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
     int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
     int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
@@ -283,10 +203,10 @@ public class TajoPullServerService extends AbstractService {
         .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
         .removalListener(removalListener)
         .build(
-            new CacheLoader<CacheKey, BSTIndexReader>() {
+            new CacheLoader<IndexCacheKey, BSTIndexReader>() {
               @Override
-              public BSTIndexReader load(CacheKey key) throws Exception {
-                return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+              public BSTIndexReader load(IndexCacheKey key) throws Exception {
+                return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
               }
             }
         );
@@ -353,29 +273,31 @@ public class TajoPullServerService extends AbstractService {
   }
 
   @Override
-  public void stop() {
-    try {
-      accepted.close();
-      if (selector != null) {
-        if (selector.group() != null) {
-          selector.group().shutdownGracefully();
-        }
-        if (selector.childGroup() != null) {
-          selector.childGroup().shutdownGracefully();
-        }
+  public void serviceStop() throws Exception {
+    accepted.close();
+    if (selector != null) {
+      if (selector.group() != null) {
+        selector.group().shutdownGracefully();
       }
-
-      if (channelInitializer != null) {
-        channelInitializer.destroy();
+      if (selector.childGroup() != null) {
+        selector.childGroup().shutdownGracefully();
       }
+    }
 
-      localFS.close();
-      indexReaderCache.invalidateAll();
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    } finally {
-      super.stop();
+    if (channelInitializer != null) {
+      channelInitializer.destroy();
     }
+
+    localFS.close();
+    indexReaderCache.invalidateAll();
+
+    super.serviceStop();
+  }
+
+  public List<FileChunk> getFileChunks(TajoConf conf, LocalDirAllocator lDirAlloc, PullServerParams params)
+      throws IOException, ExecutionException {
+    return PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+        lowCacheHitCheckThreshold);
   }
 
   class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -385,8 +307,7 @@ public class TajoPullServerService extends AbstractService {
 
     public HttpChannelInitializer(TajoConf conf) throws Exception {
       PullServer = new PullServer(conf);
-      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
-          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+      if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
         sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
         sslFactory.init();
       }
@@ -417,72 +338,13 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-
-  Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<>();
-
-  public void completeFileChunk(FileRegion filePart,
-                                   String requestUri,
-                                   long startTime) {
-    ProcessingStatus status = processingStatusMap.get(requestUri);
-    if (status != null) {
-      status.decrementRemainFiles(filePart, startTime);
-    }
-  }
-
-  class ProcessingStatus {
-    String requestUri;
-    int numFiles;
-    long startTime;
-    long makeFileListTime;
-    long minTime = Long.MAX_VALUE;
-    long maxTime;
-    volatile int numSlowFile;
-    volatile int remainFiles;
-
-    public ProcessingStatus(String requestUri) {
-      this.requestUri = requestUri;
-      this.startTime = System.currentTimeMillis();
-    }
-
-    public void setNumFiles(int numFiles) {
-      this.numFiles = numFiles;
-      this.remainFiles = numFiles;
-    }
-
-    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
-      long fileSendTime = System.currentTimeMillis() - fileStartTime;
-
-      if (fileSendTime > maxTime) {
-        maxTime = fileSendTime;
-      }
-      if (fileSendTime < minTime) {
-        minTime = fileSendTime;
-      }
-
-      if (fileSendTime > 20 * 1000) {
-        LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " +
-            "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri);
-        SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1);
-      }
-
-      REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
-      if (REMAIN_FILE_UPDATER.get(this) <= 0) {
-        processingStatusMap.remove(requestUri);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
-              + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
-              + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
-        }
-      }
-    }
-  }
-
   @ChannelHandler.Sharable
   class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
     private final TajoConf conf;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    private final Gson gson = new Gson();
 
     public PullServer(TajoConf conf) throws IOException {
       this.conf = conf;
@@ -512,7 +374,7 @@ public class TajoPullServerService extends AbstractService {
       }
 
       if (request.getMethod() == HttpMethod.DELETE) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
 
         clearIndexCache(request.getUri());
@@ -523,110 +385,117 @@ public class TajoPullServerService extends AbstractService {
       }
 
       // Parsing the URL into key-values
-      Map<String, List<String>> params = null;
       try {
-        params = decodeParams(request.getUri());
+        final PullServerParams params = new PullServerParams(request.getUri());
+        if (PullServerUtil.isChunkRequest(params.requestType())) {
+          handleChunkRequest(ctx, request, params);
+        } else {
+          handleMetaRequest(ctx, request, params);
+        }
       } catch (Throwable e) {
-        LOG.error("Failed to decode uri " + request.getUri());
+        LOG.error("Failed to handle request " + request.getUri());
         sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
         return;
       }
+    }
 
-      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
-      processingStatusMap.put(request.getUri(), processingStatus);
-
-      String partId = params.get("p").get(0);
-      String queryId = params.get("qid").get(0);
-      String shuffleType = params.get("type").get(0);
-      String sid =  params.get("sid").get(0);
-
-      final List<String> taskIdList = params.get("ta");
-      final List<String> offsetList = params.get("offset");
-      final List<String> lengthList = params.get("length");
-
-      long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-      long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-      List<String> taskIds = splitMaps(taskIdList);
-
-      Path queryBaseDir = getBaseOutputDir(queryId, sid);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-            + ", taskIds=" + taskIdList);
-
-        // the working dir of tajo worker for each query
-        LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+    /**
+     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+     * It is called whenever an execution block is completed.
+     *
+     * @param uri query URI which indicates the execution block id
+     * @throws IOException
+     * @throws InvalidURLException
+     */
+    public void clearIndexCache(String uri)
+        throws IOException, InvalidURLException {
+      // Simply parse the given uri
+      String[] tokens = uri.split("=");
+      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+        throw new IllegalArgumentException("invalid params: " + uri);
       }
-
-      final List<FileChunk> chunks = Lists.newArrayList();
-
-      // if a stage requires a range shuffle
-      if (shuffleType.equals("r")) {
-        final String startKey = params.get("start").get(0);
-        final String endKey = params.get("end").get(0);
-        final boolean last = params.get("final") != null;
-
-        long before = System.currentTimeMillis();
-        for (String eachTaskId : taskIds) {
-          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
-          if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
-            LOG.warn(outputPath + "does not exist.");
-            continue;
-          }
-          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
-
-          FileChunk chunk;
-          try {
-            chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
-          } catch (Throwable t) {
-            LOG.error("ERROR Request: " + request.getUri(), t);
-            sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
-            return;
+      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+      String queryId = ebId.getQueryId().toString();
+      String ebSeqId = Integer.toString(ebId.getId());
+      List<IndexCacheKey> removed = new ArrayList<>();
+      synchronized (indexReaderCache) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
           }
-          if (chunk != null) {
-            chunks.add(chunk);
+        }
+        indexReaderCache.invalidateAll(removed);
+      }
+      removed.clear();
+      synchronized (waitForRemove) {
+        for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+          IndexCacheKey key = e.getKey();
+          if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
           }
         }
-        long after = System.currentTimeMillis();
-        LOG.info("Index lookup time: " + (after - before) + " ms");
-
-        // if a stage requires a hash shuffle or a scattered hash shuffle
-      } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-        int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-        Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
-        if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
-          LOG.warn("Partition shuffle file not exists: " + partPath);
-          sendError(ctx, HttpResponseStatus.NO_CONTENT);
-          return;
+        for (IndexCacheKey eachKey : removed) {
+          waitForRemove.remove(eachKey);
         }
+      }
+    }
+
+    private void handleMetaRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+        throws IOException, ExecutionException {
+      final List<String> jsonMetas;
+      try {
+        jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
+      }
 
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.OK,
+          Unpooled.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+      response.headers().set(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+      HttpHeaders.setContentLength(response, response.content().readableBytes());
+      if (HttpHeaders.isKeepAlive(request)) {
+        response.headers().set(Names.CONNECTION, Values.KEEP_ALIVE);
+      }
+      ChannelFuture writeFuture = ctx.writeAndFlush(response);
 
-        File file = new File(path.toUri());
-        long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-        long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+      // Decide whether to close the connection or not.
+      if (!HttpHeaders.isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
 
-        if (startPos >= file.length()) {
-          String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
-          LOG.error(errorMessage);
-          sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
-          return;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
-        }
-        FileChunk chunk = new FileChunk(file, startPos, readLen);
-        chunks.add(chunk);
-      } else {
-        LOG.error("Unknown shuffle type: " + shuffleType);
-        sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
+    private void handleChunkRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+        throws IOException {
+      final List<FileChunk> chunks;
+      try {
+        chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+            lowCacheHitCheckThreshold);
+      } catch (FileNotFoundException e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+        return;
+      } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
         return;
+      } catch (ExecutionException e) {
+        // There are some problems in index cache
+        throw new TajoInternalError(e.getCause());
       }
 
       // Write the content.
       if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
 
         if (!HttpHeaders.isKeepAlive(request)) {
           ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -637,7 +506,7 @@ public class TajoPullServerService extends AbstractService {
       } else {
         FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
         ChannelFuture writeFuture = null;
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
         StringBuilder sb = new StringBuilder();
         for (FileChunk chunk : file) {
@@ -645,7 +514,7 @@ public class TajoPullServerService extends AbstractService {
           sb.append(Long.toString(chunk.length())).append(",");
         }
         sb.deleteCharAt(sb.length() - 1);
-        HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
+        HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
         HttpHeaders.setContentLength(response, totalSize);
 
         if (HttpHeaders.isKeepAlive(request)) {
@@ -655,7 +524,7 @@ public class TajoPullServerService extends AbstractService {
         writeFuture = ctx.write(response);
 
         for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, chunk, request.getUri());
+          writeFuture = sendFile(ctx, chunk);
           if (writeFuture == null) {
             sendError(ctx, HttpResponseStatus.NOT_FOUND);
             return;
@@ -676,53 +545,8 @@ public class TajoPullServerService extends AbstractService {
       }
     }
 
-    /**
-     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
-     * It is called whenever an execution block is completed.
-     *
-     * @param uri query URI which indicates the execution block id
-     * @throws IOException
-     * @throws InvalidURLException
-     */
-    private void clearIndexCache(String uri) throws IOException, InvalidURLException {
-      // Simply parse the given uri
-      String[] tokens = uri.split("=");
-      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
-        throw new IllegalArgumentException("invalid params: " + uri);
-      }
-      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
-      String queryId = ebId.getQueryId().toString();
-      String ebSeqId = Integer.toString(ebId.getId());
-      List<CacheKey> removed = new ArrayList<>();
-      synchronized (indexReaderCache) {
-        for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
-          CacheKey key = e.getKey();
-          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
-            e.getValue().forceClose();
-            removed.add(e.getKey());
-          }
-        }
-        indexReaderCache.invalidateAll(removed);
-      }
-      removed.clear();
-      synchronized (waitForRemove) {
-        for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
-          CacheKey key = e.getKey();
-          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
-            e.getValue().forceClose();
-            removed.add(e.getKey());
-          }
-        }
-        for (CacheKey eachKey : removed) {
-          waitForRemove.remove(eachKey);
-        }
-      }
-    }
-
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   FileChunk file,
-                                   String requestUri) throws IOException {
-      long startTime = System.currentTimeMillis();
+                                   FileChunk file) throws IOException {
       RandomAccessFile spill = null;      
       ChannelFuture writeFuture;
       try {
@@ -732,7 +556,7 @@ public class TajoPullServerService extends AbstractService {
               file.startOffset(), file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
           writeFuture = ctx.write(filePart);
-          writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+          writeFuture.addListener(new FileCloseListener(filePart));
         } else {
           // HTTPS cannot be done with zero copy.
           final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -763,9 +587,10 @@ public class TajoPullServerService extends AbstractService {
 
     private void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
-      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
-          Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+      ByteBuf content = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, content);
       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+      HttpHeaders.setContentLength(response, content.writerIndex());
 
       // Close the connection as soon as the error message is sent.
       ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -782,12 +607,12 @@ public class TajoPullServerService extends AbstractService {
   }
 
   // Temporal space to wait for the completion of all index lookup operations
-  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
 
   // RemovalListener is triggered when an item is removed from the index reader cache.
   // It closes index readers when they are not used anymore.
   // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
-  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+  private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
     BSTIndexReader reader = removal.getValue();
     if (reader.getReferenceNum() == 0) {
       try {
@@ -800,180 +625,4 @@ public class TajoPullServerService extends AbstractService {
       waitForRemove.put(removal.getKey(), reader);
     }
   };
-
-  public static FileChunk getFileChunks(String queryId,
-                                        String ebSeqId,
-                                        Path outDir,
-                                        String startKey,
-                                        String endKey,
-                                        boolean last) throws IOException, ExecutionException {
-
-    BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
-    idxReader.retain();
-
-    File data;
-    long startOffset;
-    long endOffset;
-    try {
-      if (LOG.isDebugEnabled()) {
-        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
-          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
-        }
-      }
-
-      Tuple indexedFirst = idxReader.getFirstKey();
-      Tuple indexedLast = idxReader.getLastKey();
-
-      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("There is no contents");
-        }
-        return null;
-      }
-
-      byte[] startBytes = Base64.decodeBase64(startKey);
-      byte[] endBytes = Base64.decodeBase64(endKey);
-
-
-      Tuple start;
-      Tuple end;
-      Schema keySchema = idxReader.getKeySchema();
-      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-
-      try {
-        start = decoder.toTuple(startBytes);
-      } catch (Throwable t) {
-        throw new IllegalArgumentException("StartKey: " + startKey
-            + ", decoded byte size: " + startBytes.length, t);
-      }
-
-      try {
-        end = decoder.toTuple(endBytes);
-      } catch (Throwable t) {
-        throw new IllegalArgumentException("EndKey: " + endKey
-            + ", decoded byte size: " + endBytes.length, t);
-      }
-
-      data = new File(URI.create(outDir.toUri() + "/output"));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
-            (last ? ", last=true" : "") + ")");
-      }
-
-      TupleComparator comparator = idxReader.getComparator();
-
-      if (comparator.compare(end, indexedFirst) < 0 ||
-          comparator.compare(indexedLast, start) < 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
-              "], but request start:" + start + ", end: " + end);
-        }
-        return null;
-      }
-
-      try {
-        idxReader.init();
-        startOffset = idxReader.find(start);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-      try {
-        endOffset = idxReader.find(end);
-        if (endOffset == -1) {
-          endOffset = idxReader.find(end, true);
-        }
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-
-      // if startOffset == -1 then case 2-1 or case 3
-      if (startOffset == -1) { // this is a hack
-        // if case 2-1 or case 3
-        try {
-          startOffset = idxReader.find(start, true);
-        } catch (IOException ioe) {
-          LOG.error("State Dump (the requested range: "
-              + "[" + start + ", " + end + ")" + ", idx min: "
-              + idxReader.getFirstKey() + ", idx max: "
-              + idxReader.getLastKey());
-          throw ioe;
-        }
-      }
-
-      if (startOffset == -1) {
-        throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-            "State Dump (the requested range: "
-            + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-      }
-
-      // if greater than indexed values
-      if (last || (endOffset == -1
-          && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-        endOffset = data.length();
-      }
-    } finally {
-      idxReader.release();
-    }
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-
-    if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-
-  public static List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
-    }
-    return ret;
-  }
-
-  public static Map<String, List<String>> decodeParams(String uri) {
-    final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
-    final List<String> types = params.get("type");
-    final List<String> qids = params.get("qid");
-    final List<String> ebIds = params.get("sid");
-    final List<String> partIds = params.get("p");
-
-    if (types == null || ebIds == null || qids == null || partIds == null) {
-      throw new IllegalArgumentException("invalid params. required :" + params);
-    }
-
-    if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
-      throw new IllegalArgumentException("invalid params. required :" + params);
-    }
-
-    return params;
-  }
-
-  public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            queryId,
-            "output",
-            executionBlockSequenceId);
-    return workDir;
-  }
-
-  public static Path getBaseInputDir(String queryId, String executionBlockId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            queryId,
-            "in",
-            executionBlockId);
-    return workDir;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index 67cff21..c5f6a6a 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -36,7 +36,7 @@ public class FileChunk {
    */
   private String ebId;
 
-  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+  public FileChunk(File file, long startOffset, long length) {
     this.file = file;
     this.startOffset = startOffset;
     this.length = length;
@@ -76,6 +76,6 @@ public class FileChunk {
 
   public String toString() {
     return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
-	+ file.getAbsolutePath();
+        + file.getAbsolutePath();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
new file mode 100644
index 0000000..3f6b3eb
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+public class FileChunkMeta {
+  private final long startOffset;
+  private final long length;
+  private final String ebId;
+  private final String taskId;
+
+  public FileChunkMeta(long startOffset, long length, String ebId, String taskId) {
+    this.startOffset = startOffset;
+    this.length = length;
+    this.ebId = ebId;
+    this.taskId = taskId;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public String getEbId() {
+    return ebId;
+  }
+
+  public String toString() {
+    return "ebId: " + ebId + ", taskId: " + taskId + " (" + startOffset + ", " + length + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
new file mode 100644
index 0000000..2a71c65
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Objects;
+
+public class IndexCacheKey {
+  private Path path;
+  private String queryId;
+  private String ebSeqId;
+
+  public IndexCacheKey(Path path, String queryId, String ebSeqId) {
+    this.path = path;
+    this.queryId = queryId;
+    this.ebSeqId = ebSeqId;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public String getEbSeqId() {
+    return ebSeqId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof IndexCacheKey) {
+      IndexCacheKey other = (IndexCacheKey) o;
+      return Objects.equals(this.path, other.path)
+          && Objects.equals(this.queryId, other.queryId)
+          && Objects.equals(this.ebSeqId, other.ebSeqId);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path, queryId, ebSeqId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn/pom.xml b/tajo-yarn/pom.xml
new file mode 100644
index 0000000..70511a1
--- /dev/null
+++ b/tajo-yarn/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>tajo-yarn</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo Yarn</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.4.1</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.iq80.snappy</groupId>
+          <artifactId>snappy</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>net.minidev</groupId>
+          <artifactId>json-smart</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.tajo</groupId>
+          <artifactId>tajo-plan</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-storage-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-orc</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop-bundle</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>trevni-avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>trevni-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-pullserver</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.tajo</groupId>
+          <artifactId>tajo-rpc-protobuf</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>src</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-source-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- builds source jars and attaches them to the project for publishing -->
+                <id>tajo-java-sources</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar-no-fork</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
new file mode 100644
index 0000000..3c0a76f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+                             int chunkSize, boolean manageOsCache, int readaheadLength,
+                             ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}