You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/12/24 10:18:24 UTC

[1/2] tajo git commit: TAJO-1950: Query master uses too much memory during range shuffle.

Repository: tajo
Updated Branches:
  refs/heads/master e8ee7f2bf -> 1f9ae1da0


http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 74805ce..6d9639c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -25,17 +25,19 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
@@ -53,6 +55,7 @@ import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -92,11 +95,13 @@ public class TaskImpl implements Task {
   private long endTime;
 
   private List<FileChunk> localChunks;
+  private List<FileChunk> remoteChunks;
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
 
   private TupleComparator sortComp = null;
+  private final int maxUrlLength;
 
   public TaskImpl(final TaskRequest request,
                   final ExecutionBlockContext executionBlockContext) throws IOException {
@@ -122,6 +127,7 @@ public class TaskImpl implements Task {
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
     this.context.setState(TaskAttemptState.TA_PENDING);
+    this.maxUrlLength = systemConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
   }
 
   public void initPlan() throws IOException {
@@ -148,14 +154,15 @@ public class TaskImpl implements Task {
     }
 
     this.localChunks = Collections.synchronizedList(new ArrayList<>());
+    this.remoteChunks = Collections.synchronizedList(new ArrayList<>());
 
     LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: %s, Fragments: %d, Fetches:%d, " +
         "Local dir: %s", request.getId(), interQuery, shuffleType, request.getFragments().size(),
         request.getFetches().size(), taskDir));
 
     if(LOG.isDebugEnabled()) {
-      for (FetchImpl f : request.getFetches()) {
-        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+      for (FetchProto f : request.getFetches()) {
+        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + Repartitioner.createSimpleURIs(maxUrlLength, f));
       }
     }
 
@@ -250,6 +257,18 @@ public class TaskImpl implements Task {
 
   @Override
   public void fetch(ExecutorService fetcherExecutor) {
+    // Sort the execution order of fetch runners to increase the cache hit in pull server
+    fetcherRunners.sort((f1, f2) -> {
+      String strUri = f1.getURI().toString();
+      int index = strUri.lastIndexOf("&ta");
+      String taskIdStr1 = strUri.substring(index + "&ta".length());
+
+      strUri = f2.getURI().toString();
+      index = strUri.lastIndexOf("&ta");
+      String taskIdStr2 = strUri.substring(index + "&ta".length());
+      return taskIdStr1.compareTo(taskIdStr2);
+    });
+
     for (Fetcher f : fetcherRunners) {
       fetcherExecutor.submit(new FetchRunner(context, f));
     }
@@ -375,8 +394,7 @@ public class TaskImpl implements Task {
       if (broadcastTableNames.contains(inputTable)) {
         continue;
       }
-      File tableDir = new File(context.getFetchIn(), inputTable);
-      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      FileFragment[] frags = localizeFetchedData(inputTable);
       context.updateAssignedFragments(inputTable, frags);
     }
   }
@@ -540,24 +558,21 @@ public class TaskImpl implements Task {
     return false;
   }
 
-  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+  private FileFragment[] localizeFetchedData(String name)
       throws IOException {
 
     Configuration c = new Configuration(systemConf);
     c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
     FileSystem fs = FileSystem.get(c);
-    Path tablePath = new Path(file.getAbsolutePath());
 
     List<FileFragment> listTablets = new ArrayList<>();
     FileFragment tablet;
 
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus f : fileLists) {
-      if (f.getLen() == 0) {
-        continue;
+    for (FileChunk chunk : remoteChunks) {
+      if (name.equals(chunk.getEbId())) {
+        tablet = new FileFragment(name, fs.makeQualified(new Path(chunk.getFile().getPath())), chunk.startOffset(), chunk.length());
+        listTablets.add(tablet);
       }
-      tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, f.getLen());
-      listTablets.add(tablet);
     }
 
     // Special treatment for locally pseudo fetched chunks
@@ -604,11 +619,16 @@ public class TaskImpl implements Task {
             LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
           }
           try {
-            FileChunk fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
-                && fetched.getFile() != null) {
-              if (fetched.fromRemote() == false) {
-                localChunks.add(fetched);
+            List<FileChunk> fetched = fetcher.get();
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+              for (FileChunk eachFetch : fetched) {
+                if (eachFetch.getFile() != null) {
+                  if (!eachFetch.fromRemote()) {
+                    localChunks.add(eachFetch);
+                  } else {
+                    remoteChunks.add(eachFetch);
+                  }
+                }
               }
               break;
             }
@@ -658,7 +678,7 @@ public class TaskImpl implements Task {
   }
 
   private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchImpl> fetches) throws IOException {
+                                        List<FetchProto> fetches) throws IOException {
 
     if (fetches.size() > 0) {
       Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -668,50 +688,59 @@ public class TaskImpl implements Task {
       int localStoreChunkCount = 0;
       File storeDir;
       File defaultStoreFile;
-      FileChunk storeChunk = null;
+      List<FileChunk> storeChunkList = new ArrayList<>();
       List<Fetcher> runnerList = Lists.newArrayList();
 
-      for (FetchImpl f : fetches) {
+      for (FetchProto f : fetches) {
         storeDir = new File(inputDir.toString(), f.getName());
         if (!storeDir.exists()) {
           if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
         }
 
-        for (URI uri : f.getURIs()) {
+        for (URI uri : Repartitioner.createFullURIs(maxUrlLength, f)) {
+          storeChunkList.clear();
           defaultStoreFile = new File(storeDir, "in_" + i);
           InetAddress address = InetAddress.getByName(uri.getHost());
 
           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
 
-            storeChunk = getLocalStoredFileChunk(uri, systemConf);
+            List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
 
-            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-            // So, we should skip and don't need to create storeChunk.
-            if (storeChunk == null || storeChunk.length() == 0) {
-              continue;
-            }
+            for (FileChunk localChunk : localChunkCandidates) {
+              // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+              // So, we should skip and don't need to create storeChunk.
+              if (localChunk == null || localChunk.length() == 0) {
+                continue;
+              }
 
-            if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
-              storeChunk.setFromRemote(false);
-              localStoreChunkCount++;
-            } else {
-              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-              storeChunk.setFromRemote(true);
+              if (localChunk.getFile() != null && localChunk.startOffset() > -1) {
+                localChunk.setFromRemote(false);
+                localStoreChunkCount++;
+              } else {
+                localChunk = new FileChunk(defaultStoreFile, 0, -1);
+                localChunk.setFromRemote(true);
+              }
+              localChunk.setEbId(f.getName());
+              storeChunkList.add(localChunk);
             }
+
           } else {
-            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-            storeChunk.setFromRemote(true);
+            FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
+            remoteChunk.setFromRemote(true);
+            remoteChunk.setEbId(f.getName());
+            storeChunkList.add(remoteChunk);
           }
 
           // If we decide that intermediate data should be really fetched from a remote host, storeChunk
           // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-          storeChunk.setEbId(f.getName());
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-          runnerList.add(fetcher);
-          i++;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+          for (FileChunk eachChunk : storeChunkList) {
+            Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
+            runnerList.add(fetcher);
+            i++;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
+            }
           }
         }
       }
@@ -724,7 +753,7 @@ public class TaskImpl implements Task {
     }
   }
 
-  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+  private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
     // Parse the URI
 
     // Parsing the URL into key-values
@@ -749,28 +778,37 @@ public class TaskImpl implements Task {
 
     // The working directory of Tajo worker for each query, including stage
     Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
-    List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
 
-    FileChunk chunk;
+    List<FileChunk> chunkList = new ArrayList<>();
     // If the stage requires a range shuffle
     if (shuffleType.equals("r")) {
 
-      Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
-        LOG.warn("Range shuffle - file not exist. " + outputPath);
-        return null;
+      final String startKey = params.get("start").get(0);
+      final String endKey = params.get("end").get(0);
+      final boolean last = params.get("final") != null;
+      final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
+
+      long before = System.currentTimeMillis();
+      for (String eachTaskId : taskIds) {
+        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+        if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
+          LOG.warn("Range shuffle - file not exist. " + outputPath);
+          continue;
+        }
+        Path path = executionBlockContext.getLocalFS().makeQualified(
+            executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
+
+        try {
+          FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
+          chunkList.add(chunk);
+        } catch (Throwable t) {
+          LOG.error(t.getMessage(), t);
+          throw new IOException(t.getCause());
+        }
       }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
-      String startKey = params.get("start").get(0);
-      String endKey = params.get("end").get(0);
-      boolean last = params.get("final") != null;
-
-      try {
-        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-      } catch (Throwable t) {
-        LOG.error(t.getMessage(), t);
-        return null;
+      long after = System.currentTimeMillis();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Index lookup time: " + (after - before) + " ms");
       }
 
       // If the stage requires a hash shuffle or a scattered hash shuffle
@@ -779,8 +817,7 @@ public class TaskImpl implements Task {
       Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
 
       if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
-        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-        return null;
+        throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
       }
       Path path = executionBlockContext.getLocalFS().makeQualified(
         executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
@@ -789,17 +826,16 @@ public class TaskImpl implements Task {
       long readLen = (offset >= 0 && length >= 0) ? length : file.length();
 
       if (startPos >= file.length()) {
-        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-        return null;
+        throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
       }
-      chunk = new FileChunk(file, startPos, readLen);
+      FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunkList.add(chunk);
 
     } else {
-      LOG.error("Unknown shuffle type");
-      return null;
+      throw new IOException("Unknown shuffle type");
     }
 
-    return chunk;
+    return chunkList;
   }
 
   public static Path getTaskAttemptDir(TaskAttemptId quid) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto
index 3b9a85e..3643a97 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -79,15 +79,17 @@ message FetchProto {
   required ExecutionBlockIdProto execution_block_id = 4;
   required int32 partition_id = 5;
   required string name = 6;
-  optional string range_params = 7;
-  optional bool has_next = 8 [default = false];
+  optional bytes range_start = 7;
+  optional bytes range_end = 8;
+  optional bool range_last_inclusive = 9;
+  optional bool has_next = 10 [default = false];
 
   // repeated part
-  repeated int32 task_id = 9 [packed = true];
-  repeated int32 attempt_id = 10 [packed = true];
+  repeated int32 task_id = 11 [packed = true];
+  repeated int32 attempt_id = 12 [packed = true];
 
-  optional int64 offset = 11;
-  optional int64 length = 12;
+  optional int64 offset = 13;
+  optional int64 length = 14;
 }
 
 message TaskStatusProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
index f2f903b..3e17c8a 100644
--- a/tajo-core/src/main/resources/webapps/worker/task.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -21,26 +21,24 @@
 
 <%@ page import="org.apache.tajo.ExecutionBlockId" %>
 <%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.ResourceProtos.FetchProto" %>
 <%@ page import="org.apache.tajo.ResourceProtos.ShuffleFileOutput" %>
 <%@ page import="org.apache.tajo.TaskId" %>
 <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.querymaster.Query" %>
-<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.querymaster.Stage" %>
-<%@ page import="org.apache.tajo.querymaster.Task" %>
+<%@ page import="org.apache.tajo.querymaster.*" %>
 <%@ page import="org.apache.tajo.storage.DataLocation" %>
 <%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
 <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="java.net.URI" %>
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.conf.TajoConf.ConfVars" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -64,6 +62,9 @@
         return;
     }
 
+    int maxUrlLength = tajoWorker.getConfig().getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+            ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
     Query query = queryMasterTask.getQuery();
     Stage stage = query.getStage(ebid);
 
@@ -110,11 +111,11 @@
 
     String fetchInfo = "";
     delim = "";
-    for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet()) {
+    for (Map.Entry<String, Set<FetchProto>> e : task.getFetchMap().entrySet()) {
         fetchInfo += delim + "<b>" + e.getKey() + "</b>";
         delim = "<br/>";
-        for (FetchImpl f : e.getValue()) {
-            for (URI uri : f.getSimpleURIs()){
+        for (FetchProto f : e.getValue()) {
+            for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)){
                 fetchInfo += delim + uri;
             }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index 68e8093..15cfdf4 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.exception.ExceptionUtil;
 import org.apache.tajo.pullserver.retriever.DataRetriever;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 
@@ -84,7 +85,7 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
     FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
 
     // Write the content.
-    if (file == null) {
+    if (file.length == 0) {
       HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
       if (!HttpHeaders.isKeepAlive(request)) {
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -171,7 +172,8 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
       return;
     }
 
-    LOG.error(cause.getMessage(), cause);
+    LOG.error(cause.getMessage());
+    ExceptionUtil.printStackTraceIfError(LOG, cause);
     if (ch.isActive()) {
       sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index ef3d7e0..4a7f2da 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -18,6 +18,10 @@
 
 package org.apache.tajo.pullserver;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
 import com.google.common.collect.Lists;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.Unpooled;
@@ -53,24 +57,29 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.InvalidURLException;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public class TajoPullServerService extends AbstractService {
@@ -88,6 +97,7 @@ public class TajoPullServerService extends AbstractService {
   private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   private HttpChannelInitializer channelInitializer;
   private int sslFileBufferSize;
+  private int maxUrlLength;
 
   private ApplicationId appId;
   private FileSystem localFS;
@@ -100,32 +110,62 @@ public class TajoPullServerService extends AbstractService {
   private int readaheadLength;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
-
   public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
 
   private static final Map<String,String> userRsrc =
           new ConcurrentHashMap<>();
   private String userName;
 
+  private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
+  private static int lowCacheHitCheckThreshold;
+
   public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
     "tajo.pullserver.ssl.file.buffer.size";
 
   public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
 
-  private static boolean STANDALONE = false;
+  private static final boolean STANDALONE;
 
   private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
   private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
 
+  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+  static class CacheKey {
+    private Path path;
+    private String queryId;
+    private String ebSeqId;
+
+    public CacheKey(Path path, String queryId, String ebSeqId) {
+      this.path = path;
+      this.queryId = queryId;
+      this.ebSeqId = ebSeqId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof CacheKey) {
+        CacheKey other = (CacheKey) o;
+        return Objects.equals(this.path, other.path)
+            && Objects.equals(this.queryId, other.queryId)
+            && Objects.equals(this.ebSeqId, other.ebSeqId);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(path, queryId, ebSeqId);
+    }
+  }
+
   static {
     /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
     SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
     REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
 
     String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    if (!StringUtils.isEmpty(standalone)) {
-      STANDALONE = standalone.equalsIgnoreCase("true");
-    }
+    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
   }
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,6 +233,9 @@ public class TajoPullServerService extends AbstractService {
 
       localFS = new LocalFileSystem();
 
+      maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+          ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
       conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
           , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
       super.init(conf);
@@ -219,20 +262,35 @@ public class TajoPullServerService extends AbstractService {
     bootstrap.childHandler(channelInitializer)
       .channel(NioServerSocketChannel.class);
 
-    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-        ConfVars.PULLSERVER_PORT.defaultIntVal);
+    port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
     ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
         .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
         .syncUninterruptibly();
 
     accepted.add(future.channel());
     port = ((InetSocketAddress)future.channel().localAddress()).getPort();
-    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+    tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
     LOG.info(getName() + " listening on port " + port);
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
+    int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
+    int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+
+    indexReaderCache = CacheBuilder.newBuilder()
+        .maximumSize(cacheSize)
+        .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+        .removalListener(removalListener)
+        .build(
+            new CacheLoader<CacheKey, BSTIndexReader>() {
+              @Override
+              public BSTIndexReader load(CacheKey key) throws Exception {
+                return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+              }
+            }
+        );
+    lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
 
     if (STANDALONE) {
       File pullServerPortFile = getPullServerPortFile();
@@ -312,6 +370,7 @@ public class TajoPullServerService extends AbstractService {
       }
 
       localFS.close();
+      indexReaderCache.invalidateAll();
     } catch (Throwable t) {
       LOG.error(t, t);
     } finally {
@@ -348,7 +407,7 @@ public class TajoPullServerService extends AbstractService {
 
       int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
           ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
-      pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
       pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", PullServer);
@@ -422,7 +481,6 @@ public class TajoPullServerService extends AbstractService {
   class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
     private final TajoConf conf;
-//    private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
@@ -447,22 +505,36 @@ public class TajoPullServerService extends AbstractService {
     public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
             throws Exception {
 
-      if (request.getMethod() != HttpMethod.GET) {
-        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+      if (request.getDecoderResult().isFailure()) {
+        LOG.error("Http decoding failed. ", request.getDecoderResult().cause());
+        sendError(ctx, request.getDecoderResult().toString(), HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
-      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
-      processingStatusMap.put(request.getUri().toString(), processingStatus);
+      if (request.getMethod() == HttpMethod.DELETE) {
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+
+        clearIndexCache(request.getUri());
+        return;
+      } else if (request.getMethod() != HttpMethod.GET) {
+        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+        return;
+      }
 
       // Parsing the URL into key-values
       Map<String, List<String>> params = null;
       try {
         params = decodeParams(request.getUri());
       } catch (Throwable e) {
+        LOG.error("Failed to decode uri " + request.getUri());
         sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
       }
 
+      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
+      processingStatusMap.put(request.getUri(), processingStatus);
+
       String partId = params.get("p").get(0);
       String queryId = params.get("qid").get(0);
       String shuffleType = params.get("type").get(0);
@@ -491,28 +563,33 @@ public class TajoPullServerService extends AbstractService {
 
       // if a stage requires a range shuffle
       if (shuffleType.equals("r")) {
-        Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
-        if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
-          LOG.warn(outputPath + "does not exist.");
-          sendError(ctx, HttpResponseStatus.NO_CONTENT);
-          return;
-        }
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
-        String startKey = params.get("start").get(0);
-        String endKey = params.get("end").get(0);
-        boolean last = params.get("final") != null;
-
-        FileChunk chunk;
-        try {
-          chunk = getFileChunks(path, startKey, endKey, last);
-        } catch (Throwable t) {
-          LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
-          return;
-        }
-        if (chunk != null) {
-          chunks.add(chunk);
+        final String startKey = params.get("start").get(0);
+        final String endKey = params.get("end").get(0);
+        final boolean last = params.get("final") != null;
+
+        long before = System.currentTimeMillis();
+        for (String eachTaskId : taskIds) {
+          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+          if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+            LOG.warn(outputPath + "does not exist.");
+            continue;
+          }
+          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+          FileChunk chunk;
+          try {
+            chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
+          } catch (Throwable t) {
+            LOG.error("ERROR Request: " + request.getUri(), t);
+            sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
+            return;
+          }
+          if (chunk != null) {
+            chunks.add(chunk);
+          }
         }
+        long after = System.currentTimeMillis();
+        LOG.info("Index lookup time: " + (after - before) + " ms");
 
         // if a stage requires a hash shuffle or a scattered hash shuffle
       } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
@@ -536,7 +613,9 @@ public class TajoPullServerService extends AbstractService {
           sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
           return;
         }
-        LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+        }
         FileChunk chunk = new FileChunk(file, startPos, readLen);
         chunks.add(chunk);
       } else {
@@ -545,8 +624,6 @@ public class TajoPullServerService extends AbstractService {
         return;
       }
 
-      processingStatus.setNumFiles(chunks.size());
-      processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
       // Write the content.
       if (chunks.size() == 0) {
         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
@@ -562,9 +639,13 @@ public class TajoPullServerService extends AbstractService {
         ChannelFuture writeFuture = null;
         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
+        StringBuilder sb = new StringBuilder();
         for (FileChunk chunk : file) {
           totalSize += chunk.length();
+          sb.append(Long.toString(chunk.length())).append(",");
         }
+        sb.deleteCharAt(sb.length() - 1);
+        HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
         HttpHeaders.setContentLength(response, totalSize);
 
         if (HttpHeaders.isKeepAlive(request)) {
@@ -580,6 +661,7 @@ public class TajoPullServerService extends AbstractService {
             return;
           }
         }
+
         if (ctx.pipeline().get(SslHandler.class) == null) {
           writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
         } else {
@@ -594,6 +676,49 @@ public class TajoPullServerService extends AbstractService {
       }
     }
 
+    /**
+     * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+     * It is called whenever an execution block is completed.
+     *
+     * @param uri query URI which indicates the execution block id
+     * @throws IOException
+     * @throws InvalidURLException
+     */
+    private void clearIndexCache(String uri) throws IOException, InvalidURLException {
+      // Simply parse the given uri
+      String[] tokens = uri.split("=");
+      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+        throw new IllegalArgumentException("invalid params: " + uri);
+      }
+      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+      String queryId = ebId.getQueryId().toString();
+      String ebSeqId = Integer.toString(ebId.getId());
+      List<CacheKey> removed = new ArrayList<>();
+      synchronized (indexReaderCache) {
+        for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+          CacheKey key = e.getKey();
+          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        indexReaderCache.invalidateAll(removed);
+      }
+      removed.clear();
+      synchronized (waitForRemove) {
+        for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+          CacheKey key = e.getKey();
+          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        for (CacheKey eachKey : removed) {
+          waitForRemove.remove(eachKey);
+        }
+      }
+    }
+
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
                                    FileChunk file,
                                    String requestUri) throws IOException {
@@ -617,7 +742,7 @@ public class TajoPullServerService extends AbstractService {
           writeFuture = ctx.write(new HttpChunkedInput(chunk));
         }
       } catch (FileNotFoundException e) {
-        LOG.info(file.getFile() + " not found");
+        LOG.fatal(file.getFile() + " not found");
         return null;
       } catch (Throwable e) {
         if (spill != null) {
@@ -656,26 +781,65 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-  public static FileChunk getFileChunks(Path outDir,
+  // Temporal space to wait for the completion of all index lookup operations
+  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+
+  // RemovalListener is triggered when an item is removed from the index reader cache.
+  // It closes index readers when they are not used anymore.
+  // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
+  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+    BSTIndexReader reader = removal.getValue();
+    if (reader.getReferenceNum() == 0) {
+      try {
+        reader.close(); // tear down properly
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      waitForRemove.remove(removal.getKey());
+    } else {
+      waitForRemove.put(removal.getKey(), reader);
+    }
+  };
+
+  public static FileChunk getFileChunks(String queryId,
+                                        String ebSeqId,
+                                        Path outDir,
                                         String startKey,
                                         String endKey,
-                                        boolean last) throws IOException {
-    BSTIndex index = new BSTIndex(new TajoConf());
-    try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) {
-      Schema keySchema = idxReader.getKeySchema();
-      TupleComparator comparator = idxReader.getComparator();
+                                        boolean last) throws IOException, ExecutionException {
+
+    BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
+    idxReader.retain();
 
+    File data;
+    long startOffset;
+    long endOffset;
+    try {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+        }
+      }
+
+      Tuple indexedFirst = idxReader.getFirstKey();
+      Tuple indexedLast = idxReader.getLastKey();
+
+      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no contents");
+        }
+        return null;
       }
 
-      File data = new File(URI.create(outDir.toUri() + "/output"));
       byte[] startBytes = Base64.decodeBase64(startKey);
       byte[] endBytes = Base64.decodeBase64(endKey);
 
-      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
       Tuple start;
       Tuple end;
+      Schema keySchema = idxReader.getKeySchema();
+      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
       try {
         start = decoder.toTuple(startBytes);
       } catch (Throwable t) {
@@ -690,23 +854,23 @@ public class TajoPullServerService extends AbstractService {
             + ", decoded byte size: " + endBytes.length, t);
       }
 
-      LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
-          (last ? ", last=true" : "") + ")");
-
-      if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-        LOG.info("There is no contents");
-        return null;
+      data = new File(URI.create(outDir.toUri() + "/output"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+            (last ? ", last=true" : "") + ")");
       }
 
-      if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-          comparator.compare(idxReader.getLastKey(), start) < 0) {
-        LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-            "], but request start:" + start + ", end: " + end);
+      TupleComparator comparator = idxReader.getComparator();
+
+      if (comparator.compare(end, indexedFirst) < 0 ||
+          comparator.compare(indexedLast, start) < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+              "], but request start:" + start + ", end: " + end);
+        }
         return null;
       }
 
-      long startOffset;
-      long endOffset;
       try {
         idxReader.init();
         startOffset = idxReader.find(start);
@@ -756,12 +920,14 @@ public class TajoPullServerService extends AbstractService {
           && comparator.compare(idxReader.getLastKey(), end) < 0)) {
         endOffset = data.length();
       }
+    } finally {
+      idxReader.release();
+    }
 
-      FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
 
-      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
-      return chunk;
-    }
+    if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+    return chunk;
   }
 
   public static List<String> splitMaps(List<String> mapq) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index edb1179..e0051f4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -42,6 +42,8 @@ import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
 
@@ -440,6 +442,9 @@ public class BSTIndex implements IndexMethod {
     }
   }
 
+  private static final AtomicIntegerFieldUpdater<BSTIndexReader> REFERENCE_UPDATER =
+      AtomicIntegerFieldUpdater.newUpdater(BSTIndexReader.class, "referenceNum");
+
   /**
    * BSTIndexReader is thread-safe.
    */
@@ -468,6 +473,10 @@ public class BSTIndex implements IndexMethod {
 
     private RowStoreDecoder rowStoreDecoder;
 
+    private AtomicBoolean inited = new AtomicBoolean(false);
+
+    volatile int referenceNum;
+
     /**
      *
      * @param fileName
@@ -488,6 +497,25 @@ public class BSTIndex implements IndexMethod {
       open();
     }
 
+    /**
+     * Increase the reference number of the index reader.
+     */
+    public void retain() {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum + 1);
+    }
+
+    /**
+     * Decrease the reference number of the index reader.
+     * This method must be called before {@link #close()}.
+     */
+    public void release() {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum - 1);
+    }
+
+    public int getReferenceNum() {
+      return referenceNum;
+    }
+
     public Schema getKeySchema() {
       return this.keySchema;
     }
@@ -543,8 +571,10 @@ public class BSTIndex implements IndexMethod {
       byteBuf.release();
     }
 
-    public void init() throws IOException {
-      fillData();
+    public synchronized void init() throws IOException {
+      if (inited.compareAndSet(false, true)) {
+        fillData();
+      }
     }
 
     private void open()
@@ -684,6 +714,8 @@ public class BSTIndex implements IndexMethod {
       } catch (IOException e) {
         //TODO this block should fix correctly
         counter--;
+        if (counter == 0)
+          LOG.info("counter: " + counter);
         if (pos != -1) {
           in.seek(pos);
         }
@@ -765,6 +797,9 @@ public class BSTIndex implements IndexMethod {
 
       //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
       int centerPos = (start + end) >>> 1;
+      if (arr.length == 0) {
+        LOG.error("arr.length: 0, loadNum: " + loadNum + ", inited: " + inited.get());
+      }
       while (true) {
         if (comparator.compare(arr[centerPos], key) > 0) {
           if (centerPos == 0) {
@@ -800,8 +835,23 @@ public class BSTIndex implements IndexMethod {
       return offset;
     }
 
+    /**
+     * Close index reader only when it is not used anymore.
+     */
     @Override
     public void close() throws IOException {
+      if (referenceNum == 0) {
+        this.indexIn.close();
+      }
+    }
+
+    /**
+     * Close index reader even though it is being used.
+     *
+     * @throws IOException
+     */
+    public void forceClose() throws IOException {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, 0);
       this.indexIn.close();
     }
 


[2/2] tajo git commit: TAJO-1950: Query master uses too much memory during range shuffle.

Posted by ji...@apache.org.
TAJO-1950: Query master uses too much memory during range shuffle.

Closes #884


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f9ae1da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f9ae1da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f9ae1da

Branch: refs/heads/master
Commit: 1f9ae1da0424731567cea18e975c47d4479b0ae9
Parents: e8ee7f2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 24 18:17:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 24 18:17:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |   4 +-
 .../apache/tajo/master/TestRepartitioner.java   |  81 ++---
 .../org/apache/tajo/worker/TestFetcher.java     |   2 +-
 .../apache/tajo/engine/query/TaskRequest.java   |   6 +-
 .../tajo/engine/query/TaskRequestImpl.java      |  13 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java |  32 --
 .../tajo/querymaster/DefaultTaskScheduler.java  |  13 +-
 .../tajo/querymaster/FetchScheduleEvent.java    |   7 +-
 .../apache/tajo/querymaster/Repartitioner.java  | 206 ++++++++-----
 .../java/org/apache/tajo/querymaster/Stage.java |   3 +-
 .../java/org/apache/tajo/querymaster/Task.java  |  48 ++-
 .../tajo/worker/ExecutionBlockContext.java      |  68 ++++-
 .../java/org/apache/tajo/worker/FetchImpl.java  | 140 ++++-----
 .../java/org/apache/tajo/worker/Fetcher.java    |  45 ++-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 174 ++++++-----
 tajo-core/src/main/proto/ResourceProtos.proto   |  14 +-
 .../src/main/resources/webapps/worker/task.jsp  |  17 +-
 .../tajo/pullserver/HttpDataServerHandler.java  |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  | 302 ++++++++++++++-----
 .../apache/tajo/storage/index/bst/BSTIndex.java |  54 +++-
 22 files changed, 811 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2edf9d7..c02bfc7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
+
     TAJO-1858: Aligning error message in execute query page of web UI is needed.
     (Byunghwa Yun via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9ab3dfa..29cf9ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -212,6 +212,10 @@ public class TajoConf extends Configuration {
 
     // Shuffle Configuration --------------------------------------------------
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
+    PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
+    PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+    PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
+        Validators.min("1")),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index 279fce7..81eeb1f 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -88,7 +88,9 @@ public class TestHAServiceHDFSImpl  {
       assertEquals(2, fs.listStatus(activePath).length);
       assertEquals(0, fs.listStatus(backupPath).length);
     } finally {
-      backupMaster.stop();
+      if (backupMaster != null) {
+        backupMaster.stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index b0a3a17..c260ab6 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -29,13 +29,15 @@ import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.junit.Test;
 
 import java.net.URI;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import static junit.framework.Assert.assertEquals;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -70,11 +72,9 @@ public class TestRepartitioner {
             new HashMap<>();
 
     for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
-      FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
+      FetchImpl fetch = new FetchImpl(sid.toString(), new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
           sid, eachEntry.getKey(), eachEntry.getValue());
 
-      fetch.setName(sid.toString());
-
       FetchProto proto = fetch.getProto();
       fetch = new FetchImpl(proto);
       assertEquals(proto, fetch.getProto());
@@ -84,7 +84,7 @@ public class TestRepartitioner {
 
       hashEntries.put(eachEntry.getKey(), ebEntries);
 
-      List<URI> uris = fetch.getURIs();
+      List<URI> uris = Repartitioner.createFullURIs(2 * StorageUnit.KB, fetch.getProto());
       assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only one URI per partition.
 
       URI uri = uris.get(0);
@@ -119,7 +119,7 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     FetchImpl [] fetches = new FetchImpl[12];
     for (int i = 0; i < 12; i++) {
-      fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
+      fetches[i] = new FetchImpl(tableName, new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
     }
 
     int [] VOLUMES = {100, 80, 70, 30, 10, 5};
@@ -128,37 +128,40 @@ public class TestRepartitioner {
       fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
     }
 
-    Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+    FetchProto[] expectedProtos = new FetchProto[fetches.length];
+    expectedProtos = Arrays.stream(fetches).map(fetch -> fetch.getProto()).collect(Collectors.toList())
+        .toArray(expectedProtos);
+    Pair<Long [], Map<String, List<FetchProto>>[]> results;
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
     long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
     assertFetchVolumes(expected, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
     long expected0 [] = {140, 155};
     assertFetchVolumes(expected0, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
     long expected1 [] = {100, 95, 100};
     assertFetchVolumes(expected1, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
     long expected2 [] = {100, 80, 70, 45};
     assertFetchVolumes(expected2, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
     long expected3 [] = {100, 80, 70, 30, 15};
     assertFetchVolumes(expected3, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
     long expected4 [] = {100, 80, 70, 30, 10, 5};
     assertFetchVolumes(expected4, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
   }
 
   private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -191,7 +194,8 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(6, fetches.size());
 
@@ -199,10 +203,10 @@ public class TestRepartitioner {
     int index = 0;
     int numZeroPosFetcher = 0;
     long totalLength = 0;
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       totalInterms += eachFetchList.size();
       long eachFetchVolume = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         eachFetchVolume += eachFetch.getLength();
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
@@ -248,8 +252,9 @@ public class TestRepartitioner {
       intermediateEntries.add(interm);
     }
 
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(32, fetches.size());
 
@@ -258,15 +263,15 @@ public class TestRepartitioner {
     long totalLength = 0;
     Set<String> uniqPullHost = new HashSet<>();
 
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       long length = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
         }
         totalLength += eachFetch.getLength();
         length += eachFetch.getLength();
-        uniqPullHost.add(eachFetch.getPullHost().toString());
+        uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
       }
       assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
       if (index < fetches.size() - 1) {
@@ -378,7 +383,8 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 256 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, entries, splitVolume,
         10 * 1024 * 1024);
 
 
@@ -393,13 +399,13 @@ public class TestRepartitioner {
         {728355084,121760359},
     };
     int index = 0;
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       if (index == 3) {
         assertEquals(2, eachFetchList.size());
       } else {
         assertEquals(1, eachFetchList.size());
       }
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         assertEquals(expected[index][0], eachFetch.getOffset());
         assertEquals(expected[index][1], eachFetch.getLength());
         index++;
@@ -438,13 +444,14 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(32, fetches.size());
 
     int expectedSize = 0;
-    Set<FetchImpl> fetchSet = new HashSet<>();
-    for(List<FetchImpl> list : fetches){
+    Set<FetchProto> fetchSet = new HashSet<>();
+    for(List<FetchProto> list : fetches){
       expectedSize += list.size();
       fetchSet.addAll(list);
     }
@@ -456,15 +463,15 @@ public class TestRepartitioner {
     long totalLength = 0;
     Set<String> uniqPullHost = new HashSet<>();
 
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       long length = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
         }
         totalLength += eachFetch.getLength();
         length += eachFetch.getLength();
-        uniqPullHost.add(eachFetch.getPullHost().toString());
+        uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
       }
       assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
       if (index < fetches.size() - 1) {
@@ -482,25 +489,25 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     Task.PullHost pullHost = new Task.PullHost("localhost", 0);
 
-    FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
-    FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl expected = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl fetch2 = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
     assertEquals(expected, fetch2);
     fetch2.setOffset(5);
     fetch2.setLength(10);
     assertNotEquals(expected, fetch2);
   }
 
-  private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
-    Set<FetchImpl> expectedURLs = Sets.newHashSet();
+  private static void assertFetchProto(FetchProto [] expected, Map<String, List<FetchProto>>[] result) {
+    Set<FetchProto> expectedURLs = Sets.newHashSet();
 
-    for (FetchImpl f : expected) {
+    for (FetchProto f : expected) {
       expectedURLs.add(f);
     }
 
-    Set<FetchImpl> resultURLs = Sets.newHashSet();
+    Set<FetchProto> resultURLs = Sets.newHashSet();
 
-    for (Map<String, List<FetchImpl>> e : result) {
-      for (List<FetchImpl> list : e.values()) {
+    for (Map<String, List<FetchProto>> e : result) {
+      for (List<FetchProto> list : e.values()) {
         resultURLs.addAll(list);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
index a91fc30..dfc37b0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -90,7 +90,7 @@ public class TestFetcher {
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
     final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    FileChunk chunk = fetcher.get();
+    FileChunk chunk = fetcher.get().get(0);
     assertNotNull(chunk);
     assertNotNull(chunk.getFile());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index 48d4780..ef4ff60 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -21,6 +21,7 @@
  */
 package org.apache.tajo.engine.query;
 
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -29,7 +30,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.util.List;
 
@@ -39,8 +39,8 @@ public interface TaskRequest extends ProtoObject<TaskRequestProto> {
 	List<CatalogProtos.FragmentProto> getFragments();
 	PlanProto.LogicalNodeTree getPlan();
 	void setInterQuery();
-	void addFetch(String name, FetchImpl fetch);
-	List<FetchImpl> getFetches();
+	void addFetch(FetchProto fetch);
+	List<FetchProto> getFetches();
   QueryContext getQueryContext(TajoConf conf);
   DataChannel getDataChannel();
   Enforcer getEnforcer();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index 7b52dab..fc7556c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -42,7 +42,7 @@ public class TaskRequestImpl implements TaskRequest {
 	private boolean clusteredOutput;
 	private PlanProto.LogicalNodeTree plan;     // logical node
 	private Boolean interQuery;
-	private List<FetchImpl> fetches;
+	private List<FetchProto> fetches;
 	private QueryContext queryContext;
 	private DataChannel dataChannel;
 	private Enforcer enforcer;
@@ -157,10 +157,10 @@ public class TaskRequestImpl implements TaskRequest {
 	  this.interQuery = true;
 	}
 
-  public void addFetch(String name, FetchImpl fetch) {
+  @Override
+  public void addFetch(FetchProto fetch) {
     maybeInitBuilder();
     initFetches();
-    fetch.setName(name);
     fetches.add(fetch);
   }
 
@@ -212,7 +212,8 @@ public class TaskRequestImpl implements TaskRequest {
     return this.enforcer;
   }
 
-  public List<FetchImpl> getFetches() {
+  @Override
+  public List<FetchProto> getFetches() {
 	  initFetches();    
 
     return this.fetches;
@@ -225,7 +226,7 @@ public class TaskRequestImpl implements TaskRequest {
     TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
     this.fetches = new ArrayList<>();
     for(FetchProto fetch : p.getFetchesList()) {
-      fetches.add(new FetchImpl(fetch));
+      fetches.add(fetch);
     }
 	}
 
@@ -259,7 +260,7 @@ public class TaskRequestImpl implements TaskRequest {
 		}
     if (this.fetches != null) {
       for (int i = 0; i < fetches.size(); i++) {
-        builder.addFetches(fetches.get(i).getProto());
+        builder.addFetches(fetches.get(i));
       }
     }
     if (this.queryMasterHostAndPort != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index bae04c8..31c23f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.utils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
@@ -29,47 +28,16 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 
 public class TupleUtil {
   private static final Log LOG = LogFactory.getLog(TupleUtil.class);
 
-  public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
-      throws UnsupportedEncodingException {
-    return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema));
-  }
-
-  public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
-      throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-    byte [] firstKeyBytes = encoder.toBytes(range.getStart());
-    byte [] endKeyBytes = encoder.toBytes(range.getEnd());
-
-    String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
-    String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
-
-    sb.append("start=")
-        .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
-        .append("&")
-        .append("end=")
-        .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
-
-    if (last) {
-      sb.append("&final=true");
-    }
-
-    return sb.toString();
-  }
-
   /**
    * if max value is null, set ranges[last]
    * @param sortSpecs

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index f1c0f62..26dc103 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -52,7 +52,6 @@ import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -233,11 +232,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+        Map<String, List<FetchProto>> fetches = castEvent.getFetches();
         TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
         Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
         scheduledObjectNum++;
-        for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+        for (Entry<String, List<FetchProto>> eachFetch : fetches.entrySet()) {
           task.addFetches(eachFetch.getKey(), eachFetch.getValue());
           task.addFragment(fragmentsForNonLeafTask[0], true);
           if (fragmentsForNonLeafTask[1] != null) {
@@ -983,11 +982,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
-          for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
-            Collection<FetchImpl> fetches = entry.getValue();
+          for(Map.Entry<String, Set<FetchProto>> entry: task.getFetchMap().entrySet()) {
+            Collection<FetchProto> fetches = entry.getValue();
             if (fetches != null) {
-              for (FetchImpl fetch : fetches) {
-                taskAssign.addFetch(entry.getKey(), fetch);
+              for (FetchProto fetch : fetches) {
+                taskAssign.addFetch(fetch);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
index 5fe2f80..e4e63b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.querymaster;
 
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -26,15 +27,15 @@ import java.util.List;
 import java.util.Map;
 
 public class FetchScheduleEvent extends TaskSchedulerEvent {
-  private final Map<String, List<FetchImpl>> fetches;
+  private final Map<String, List<FetchProto>> fetches;  // map of table name and fetch list
 
   public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
-                            final Map<String, List<FetchImpl>> fetches) {
+                            final Map<String, List<FetchProto>> fetches) {
     super(eventType, blockId);
     this.fetches = fetches;
   }
 
-  public Map<String, List<FetchImpl>> getFetches() {
+  public Map<String, List<FetchProto>> getFetches() {
     return fetches;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index e64cd51..bd8311f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -25,8 +25,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -48,7 +50,9 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
@@ -57,13 +61,17 @@ import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
 import java.net.URI;
+import java.net.URLEncoder;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -75,7 +83,6 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
 public class Repartitioner {
   private static final Log LOG = LogFactory.getLog(Repartitioner.class);
 
-  private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
   private final static String UNKNOWN_HOST = "unknown";
 
   public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
@@ -546,12 +553,13 @@ public class Repartitioner {
 
   private static void addJoinShuffle(Stage stage, int partitionId,
                                      Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
-    Map<String, List<FetchImpl>> fetches = new HashMap<>();
+    Map<String, List<FetchProto>> fetches = new HashMap<>();
     for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
       if (grouppedPartitions.containsKey(execBlock.getId())) {
-        Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
+        String name = execBlock.getId().toString();
+        List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE,
             grouppedPartitions.get(execBlock.getId()));
-        fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
+        fetches.put(name, requests);
       }
     }
 
@@ -568,9 +576,10 @@ public class Repartitioner {
    *
    * @return key: pullserver's address, value: a list of requests
    */
-  private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
-                                                          ShuffleType type,
-                                                          List<IntermediateEntry> partitions) {
+  private static List<FetchProto> mergeShuffleRequest(final String fetchName,
+                                                      final int partitionId,
+                                                      final ShuffleType type,
+                                                      final List<IntermediateEntry> partitions) {
     // ebId + pullhost -> FetchImmpl
     Map<String, FetchImpl> mergedPartitions = new HashMap<>();
 
@@ -582,12 +591,15 @@ public class Repartitioner {
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
       } else {
         // In some cases like union each IntermediateEntry has different EBID.
-        FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId);
+        FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId);
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
         mergedPartitions.put(mergedKey, fetch);
       }
     }
-    return mergedPartitions.values();
+
+    return mergedPartitions.values().stream()
+        .map(fetch -> fetch.getProto())
+        .collect(Collectors.toList());
   }
 
   public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -699,44 +711,44 @@ public class Repartitioner {
         new String[]{UNKNOWN_HOST});
     Stage.scheduleFragment(stage, dummyFragment);
 
-    List<FetchImpl> fetches = new ArrayList<>();
+    Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>();
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
     for (ExecutionBlock childBlock : childBlocks) {
       Stage childExecSM = stage.getContext().getStage(childBlock.getId());
       for (Task qu : childExecSM.getTasks()) {
         for (IntermediateEntry p : qu.getIntermediateData()) {
-          FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
-          fetch.addPart(p.getTaskId(), p.getAttemptId());
-          fetches.add(fetch);
+          Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId());
+          if (fetches.containsKey(key)) {
+            fetches.get(key).addPart(p.getTaskId(), p.getAttemptId());
+          } else {
+            FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+            fetch.addPart(p.getTaskId(), p.getAttemptId());
+            fetches.put(key, fetch);
+          }
         }
       }
     }
 
-    SortedMap<TupleRange, Collection<FetchImpl>> map;
+    SortedMap<TupleRange, Collection<FetchProto>> map;
     map = new TreeMap<>();
 
-    Set<FetchImpl> fetchSet;
-    try {
-      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
-      for (int i = 0; i < ranges.length; i++) {
-        fetchSet = new HashSet<>();
-        for (FetchImpl fetch: fetches) {
-          String rangeParam =
-              TupleUtil.rangeToQuery(ranges[i], i == (ranges.length - 1) , encoder);
-          FetchImpl copy = null;
-          try {
-            copy = fetch.clone();
-          } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
-          }
-          copy.setRangeParams(rangeParam);
-          fetchSet.add(copy);
+    Set<FetchProto> fetchSet;
+    RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
+    for (int i = 0; i < ranges.length; i++) {
+      fetchSet = new HashSet<>();
+      RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder);
+      for (FetchImpl fetch : fetches.values()) {
+        FetchImpl copy = null;
+        try {
+          copy = fetch.clone();
+        } catch (CloneNotSupportedException e) {
+          throw new RuntimeException(e);
         }
-        map.put(ranges[i], fetchSet);
+        copy.setRangeParams(rangeParam);
+        fetchSet.add(copy.getProto());
       }
 
-    } catch (UnsupportedEncodingException e) {
-      LOG.error(e);
+      map.put(ranges[i], fetchSet);
     }
 
     scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
@@ -744,20 +756,20 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
-                                                   String tableName, int num) {
+  public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions,
+                                                 String tableName, int num) {
     int i;
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
     for (i = 0; i < num; i++) {
       fetchesArray[i] = new HashMap<>();
     }
     i = 0;
-    for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
-      Collection<FetchImpl> value = entry.getValue();
+    for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) {
+      Collection<FetchProto> value = entry.getValue();
       TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
       if (i == num) i = 0;
     }
-    for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
+    for (Map<String, List<FetchProto>> eachFetches : fetchesArray) {
       Stage.scheduleFetches(stage, eachFetches);
     }
   }
@@ -785,6 +797,10 @@ public class Repartitioner {
       return totalVolume;
     }
 
+    public List<FetchProto> getFetchProtos() {
+      return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList());
+    }
+
   }
 
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
@@ -821,7 +837,7 @@ public class Repartitioner {
         Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
         for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
 
-          FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+          FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(),
               block.getId(), interm.getKey(), e.getValue());
 
           long volumeSum = 0;
@@ -891,20 +907,16 @@ public class Repartitioner {
     }
   }
 
-  public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
+  public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl(
       Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
 
     // Sort fetchGroupMeta in a descending order of data volumes.
     List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
-    Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
-      @Override
-      public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
-        return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0);
-      }
-    });
+    Collections.sort(fetchGroupMetaList, (o1, o2) ->
+        o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0));
 
     // Initialize containers
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
     Long [] assignedVolumes = new Long[num];
     // initialization
     for (int i = 0; i < num; i++) {
@@ -925,7 +937,7 @@ public class Repartitioner {
         FetchGroupMeta fetchGroupMeta = iterator.next();
         assignedVolumes[p] += fetchGroupMeta.getVolume();
 
-        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
         p++;
       }
 
@@ -933,13 +945,13 @@ public class Repartitioner {
       while (p >= 0 && iterator.hasNext()) {
         FetchGroupMeta fetchGroupMeta = iterator.next();
         assignedVolumes[p] += fetchGroupMeta.getVolume();
-        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
 
         // While the current one is smaller than next one, it adds additional fetches to current one.
         while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) {
           FetchGroupMeta additionalFetchGroup = iterator.next();
           assignedVolumes[p] += additionalFetchGroup.getVolume();
-          TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
+          TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos());
         }
 
         p--;
@@ -951,9 +963,9 @@ public class Repartitioner {
 
   public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
                                                              String tableName, int num) {
-    Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
+    Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
     // Schedule FetchImpls
-    for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
+    for (Map<String, List<FetchProto>> eachFetches : fetchsArray) {
       Stage.scheduleFetches(stage, eachFetches);
     }
   }
@@ -976,7 +988,7 @@ public class Repartitioner {
       throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
           "tajo.shuffle.hash.appender.page.volumn-mb");
     }
-    List<List<FetchImpl>> fetches = new ArrayList<>();
+    List<List<FetchProto>> fetches = new ArrayList<>();
 
     long totalIntermediateSize = 0L;
     for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) {
@@ -996,7 +1008,7 @@ public class Repartitioner {
 
       // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
       for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) {
-        List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
+        List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries,
             splitVolume, pageSize);
         if (eachFetches != null && !eachFetches.isEmpty()) {
           fetches.addAll(eachFetches);
@@ -1007,8 +1019,8 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(fetches.size());
 
     int i = 0;
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
-    for(List<FetchImpl> entry : fetches) {
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()];
+    for(List<FetchProto> entry : fetches) {
       fetchesArray[i] = new HashMap<>();
       fetchesArray[i].put(tableName, entry);
 
@@ -1030,16 +1042,16 @@ public class Repartitioner {
    * @param splitVolume
    * @return
    */
-  public static List<List<FetchImpl>> splitOrMergeIntermediates(
+  public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull  String fetchName,
       ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
     // Each List<FetchImpl> has splitVolume size.
-    List<List<FetchImpl>> fetches = new ArrayList<>();
+    List<List<FetchProto>> fetches = new ArrayList<>();
 
     Iterator<IntermediateEntry> iter = entries.iterator();
     if (!iter.hasNext()) {
       return null;
     }
-    List<FetchImpl> fetchListForSingleTask = new ArrayList<>();
+    List<FetchProto> fetchListForSingleTask = new ArrayList<>();
     long fetchListVolume = 0;
 
     while (iter.hasNext()) {
@@ -1065,11 +1077,11 @@ public class Repartitioner {
           fetchListForSingleTask = new ArrayList<>();
           fetchListVolume = 0;
         }
-        FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+        FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
             ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
         fetch.setOffset(eachSplit.getFirst());
         fetch.setLength(eachSplit.getSecond());
-        fetchListForSingleTask.add(fetch);
+        fetchListForSingleTask.add(fetch.getProto());
         fetchListVolume += eachSplit.getSecond();
       }
     }
@@ -1079,19 +1091,56 @@ public class Repartitioner {
     return fetches;
   }
 
-  public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
+  /**
+   * Get the pull server URIs.
+   */
+  public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) {
+    return createFetchURL(maxUrlLength, fetch, true);
+  }
+
+  /**
+   * Get the pull server URIs without repeated parameters.
+   */
+  public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) {
+    return createFetchURL(maxUrlLength, fetch, false);
+  }
+
+  private static String getRangeParam(FetchProto proto) {
+    StringBuilder sb = new StringBuilder();
+    String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray()));
+    String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray()));
+
+    try {
+      sb.append("start=")
+          .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
+          .append("&")
+          .append("end=")
+          .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (proto.getRangeLastInclusive()) {
+      sb.append("&final=true");
+    }
+
+    return sb.toString();
+  }
+
+  public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
     String scheme = "http://";
 
     StringBuilder urlPrefix = new StringBuilder(scheme);
-    urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
-        .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
-        .append("&sid=").append(fetch.getExecutionBlockId().getId())
+    ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
+    urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
+        .append("qid=").append(ebId.getQueryId().toString())
+        .append("&sid=").append(ebId.getId())
         .append("&p=").append(fetch.getPartitionId())
         .append("&type=");
     if (fetch.getType() == HASH_SHUFFLE) {
       urlPrefix.append("h");
     } else if (fetch.getType() == RANGE_SHUFFLE) {
-      urlPrefix.append("r").append("&").append(fetch.getRangeParams());
+      urlPrefix.append("r").append("&").append(getRangeParam(fetch));
     } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
       urlPrefix.append("s");
     }
@@ -1105,17 +1154,26 @@ public class Repartitioner {
       if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
         fetchURLs.add(URI.create(urlPrefix.toString()));
       } else {
+        urlPrefix.append("&ta=");
         // If the get request is longer than 2000 characters,
         // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
         // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
         // The below code transforms a long request to multiple requests.
         List<String> taskIdsParams = new ArrayList<>();
         StringBuilder taskIdListBuilder = new StringBuilder();
-        List<Integer> taskIds = fetch.getTaskIds();
-        List<Integer> attemptIds = fetch.getAttemptIds();
+        
+        final List<Integer> taskIds = fetch.getTaskIdList();
+        final List<Integer> attemptIds = fetch.getAttemptIdList();
+
+        // Sort task ids to increase cache hit in pull server
+        final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
+            .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+            .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+            .collect(Collectors.toList());
+
         boolean first = true;
 
-        for (int i = 0; i < taskIds.size(); i++) {
+        for (int i = 0; i < taskAndAttemptIds.size(); i++) {
           StringBuilder taskAttemptId = new StringBuilder();
 
           if (!first) { // when comma is added?
@@ -1124,17 +1182,16 @@ public class Repartitioner {
             first = false;
           }
 
-          int taskId = taskIds.get(i);
+          int taskId = taskAndAttemptIds.get(i).getFirst();
           if (taskId < 0) {
             // In the case of hash shuffle each partition has single shuffle file per worker.
             // TODO If file is large, consider multiple fetching(shuffle file can be split)
             continue;
           }
-          int attemptId = attemptIds.get(i);
+          int attemptId = taskAndAttemptIds.get(i).getSecond();
           taskAttemptId.append(taskId).append("_").append(attemptId);
 
-          if (taskIdListBuilder.length() + taskAttemptId.length()
-              > HTTP_REQUEST_MAXIMUM_LENGTH) {
+          if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
             taskIdsParams.add(taskIdListBuilder.toString());
             taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
           } else {
@@ -1145,7 +1202,6 @@ public class Repartitioner {
         if (taskIdListBuilder.length() > 0) {
           taskIdsParams.add(taskIdListBuilder.toString());
         }
-        urlPrefix.append("&ta=");
         for (String param : taskIdsParams) {
           fetchURLs.add(URI.create(urlPrefix + param));
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 254df64..5f050bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -65,7 +65,6 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -1189,7 +1188,7 @@ public class Stage implements EventHandler<StageEvent> {
         stage.getId(), leftFragment, rightFragments));
   }
 
-  public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+  public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) {
     stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
         stage.getId(), fetches));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 95a7170..9d038ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
@@ -46,7 +47,6 @@ import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.net.URI;
 import java.util.*;
@@ -55,8 +55,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.ResourceProtos.*;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
 public class Task implements EventHandler<TaskEvent> {
   /** Class Logger */
@@ -70,7 +70,7 @@ public class Task implements EventHandler<TaskEvent> {
 	private List<ScanNode> scan;
 	
 	private Map<String, Set<FragmentProto>> fragMap;
-	private Map<String, Set<FetchImpl>> fetchMap;
+	private Map<String, Set<FetchProto>> fetchMap;
 
   private int totalFragmentNum;
 
@@ -100,6 +100,8 @@ public class Task implements EventHandler<TaskEvent> {
 
   private TaskHistory finalTaskHistory;
 
+  private final int maxUrlLength;
+
   protected static final StateMachineFactory
       <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
       new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
@@ -207,6 +209,8 @@ public class Task implements EventHandler<TaskEvent> {
 
     stateMachine = stateMachineFactory.make(this);
     totalFragmentNum = 0;
+    maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+        ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
 	}
 
   public boolean isLeafTask() {
@@ -282,9 +286,9 @@ public class Task implements EventHandler<TaskEvent> {
     taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
 
     List<String[]> fetchList = new ArrayList<>();
-    for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
-      for (FetchImpl f : e.getValue()) {
-        for (URI uri : f.getSimpleURIs()){
+    for (Map.Entry<String, Set<FetchProto>> e : getFetchMap().entrySet()) {
+      for (FetchProto f : e.getValue()) {
+        for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)) {
           fetchList.add(new String[] {e.getKey(), uri.toString()});
         }
       }
@@ -364,8 +368,8 @@ public class Task implements EventHandler<TaskEvent> {
     return succeededWorker;
   }
 	
-	public void addFetches(String tableId, Collection<FetchImpl> fetches) {
-	  Set<FetchImpl> fetchSet;
+	public void addFetches(String tableId, Collection<FetchProto> fetches) {
+	  Set<FetchProto> fetchSet;
     if (fetchMap.containsKey(tableId)) {
       fetchSet = fetchMap.get(tableId);
     } else {
@@ -375,7 +379,7 @@ public class Task implements EventHandler<TaskEvent> {
     fetchMap.put(tableId, fetchSet);
 	}
 	
-	public void setFetches(Map<String, Set<FetchImpl>> fetches) {
+	public void setFetches(Map<String, Set<FetchProto>> fetches) {
 	  this.fetchMap.clear();
 	  this.fetchMap.putAll(fetches);
 	}
@@ -395,27 +399,15 @@ public class Task implements EventHandler<TaskEvent> {
 	public TaskId getId() {
 		return taskId;
 	}
-	
-	public Collection<FetchImpl> getFetchHosts(String tableId) {
-	  return fetchMap.get(tableId);
-	}
-	
-	public Collection<Set<FetchImpl>> getFetches() {
+
+	public Collection<Set<FetchProto>> getFetches() {
 	  return fetchMap.values();
 	}
 
-  public Map<String, Set<FetchImpl>> getFetchMap() {
+  public Map<String, Set<FetchProto>> getFetchMap() {
     return fetchMap;
   }
-	
-	public Collection<FetchImpl> getFetch(ScanNode scan) {
-	  return this.fetchMap.get(scan.getTableName());
-	}
-	
-	public ScanNode[] getScanNodes() {
-	  return this.scan.toArray(new ScanNode[scan.size()]);
-	}
-	
+
 	@Override
 	public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -426,10 +418,10 @@ public class Task implements EventHandler<TaskEvent> {
         builder.append(fragment).append(", ");
       }
 		}
-		for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+		for (Entry<String, Set<FetchProto>> e : fetchMap.entrySet()) {
       builder.append(e.getKey()).append(" : ");
-      for (FetchImpl t : e.getValue()) {
-        for (URI uri : t.getURIs()){
+      for (FetchProto t : e.getValue()) {
+        for (URI uri : Repartitioner.createFullURIs(maxUrlLength, t)){
           builder.append(uri).append(" ");
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 65cb6ac..098567a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,6 +20,12 @@ package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,20 +38,20 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.AsyncRpcClient;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -191,10 +197,64 @@ public class ExecutionBlockContext {
     }
     tasks.clear();
     taskHistories.clear();
+
+    // Clear index cache in pull server
+    clearIndexCache();
+
     resource.release();
     RpcClientManager.cleanup(queryMasterClient);
   }
 
+  /**
+   * Send a request to {@link TajoPullServerService} to clear index cache
+   */
+  private void clearIndexCache() {
+    // Avoid unnecessary cache clear request when the current eb is a leaf eb
+    if (executionBlockId.getId() > 1) {
+      Bootstrap bootstrap = new Bootstrap()
+          .group(NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, 1))
+          .channel(NioSocketChannel.class)
+          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+          .option(ChannelOption.TCP_NODELAY, true);
+      ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
+        @Override
+        protected void initChannel(Channel channel) throws Exception {
+          ChannelPipeline pipeline = channel.pipeline();
+          pipeline.addLast("codec", new HttpClientCodec());
+        }
+      };
+      bootstrap.handler(initializer);
+
+      WorkerConnectionInfo connInfo = workerContext.getConnectionInfo();
+      ChannelFuture future = bootstrap.connect(new InetSocketAddress(connInfo.getHost(), connInfo.getPullServerPort()))
+          .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+      try {
+        Channel channel = future.await().channel();
+        if (!future.isSuccess()) {
+          // Upon failure to connect to pull server, cache clear message is just ignored.
+          LOG.warn(future.cause());
+          return;
+        }
+
+        // Example of URI: /ebid=eb_1450063997899_0015_000002
+        ExecutionBlockId clearEbId = new ExecutionBlockId(executionBlockId.getQueryId(), executionBlockId.getId() - 1);
+        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, "ebid=" + clearEbId.toString());
+        request.headers().set(Names.HOST, connInfo.getHost());
+        request.headers().set(Names.CONNECTION, Values.CLOSE);
+        channel.writeAndFlush(request);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (future != null && future.channel().isOpen()) {
+          // Close the channel to exit.
+          future.channel().close();
+        }
+      }
+    }
+  }
+
   public TajoConf getConf() {
     return systemConf;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 7d2033c..b49d449 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -21,16 +21,21 @@ package org.apache.tajo.worker;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 
-import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 
@@ -42,8 +47,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
   private ShuffleType type; // hash or range partition method.
   private ExecutionBlockId executionBlockId;   // The executionBlock id
   private int partitionId;                     // The hash partition id
-  private String name;                         // The intermediate source name
-  private String rangeParams;                  // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+  private final String name;                   // The intermediate source name
+  private RangeParam rangeParam;               // optional, range parameter for range shuffle
   private boolean hasNext = false;             // optional, if true, has more taskIds
 
   private List<Integer> taskIds;               // repeated, the task ids
@@ -52,19 +57,48 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
   private long offset = -1;
   private long length = -1;
 
-  public FetchImpl() {
-    taskIds = new ArrayList<>();
-    attemptIds = new ArrayList<>();
+  public static class RangeParam {
+    private byte[] start;
+    private byte[] end;
+    private boolean lastInclusive;
+
+    public RangeParam(TupleRange range, boolean lastInclusive, RowStoreEncoder encoder) {
+      this.start = encoder.toBytes(range.getStart());
+      this.end = encoder.toBytes(range.getEnd());
+      this.lastInclusive = lastInclusive;
+    }
+
+    public RangeParam(byte[] start, byte[] end, boolean lastInclusive) {
+      this.start = start;
+      this.end = end;
+      this.lastInclusive = lastInclusive;
+    }
+
+    public byte[] getStart() {
+      return start;
+    }
+
+    public byte[] getEnd() {
+      return end;
+    }
+
+    public boolean isLastInclusive() {
+      return lastInclusive;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(start), Arrays.hashCode(end), lastInclusive);
+    }
   }
 
   public FetchImpl(FetchProto proto) {
-    this(new Task.PullHost(proto.getHost(), proto.getPort()),
+    this(proto.getName(),
+        new Task.PullHost(proto.getHost(), proto.getPort()),
         proto.getType(),
         new ExecutionBlockId(proto.getExecutionBlockId()),
         proto.getPartitionId(),
-        proto.getRangeParams(),
         proto.getHasNext(),
-        proto.getName(),
         proto.getTaskIdList(), proto.getAttemptIdList());
 
     if (proto.hasOffset()) {
@@ -74,31 +108,41 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     if (proto.hasLength()) {
       this.length = proto.getLength();
     }
+
+    if (proto.hasRangeStart()) {
+      this.rangeParam = new RangeParam(proto.getRangeStart().toByteArray(),
+          proto.getRangeEnd().toByteArray(), proto.getRangeLastInclusive());
+    }
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                    int partitionId) {
-    this(host, type, executionBlockId, partitionId, null, false, null,
-            new ArrayList<>(), new ArrayList<>());
+    this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>());
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                    int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
-    this(host, type, executionBlockId, partitionId, null, false, null,
+    this(name, host, type, executionBlockId, partitionId, null, false,
             new ArrayList<>(), new ArrayList<>());
     for (Task.IntermediateEntry entry : intermediateEntryList){
       addPart(entry.getTaskId(), entry.getAttemptId());
     }
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
-                   int partitionId, String rangeParams, boolean hasNext, String name,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, boolean hasNext,
+                   List<Integer> taskIds, List<Integer> attemptIds) {
+    this(name, host, type, executionBlockId, partitionId, null, hasNext, taskIds, attemptIds);
+  }
+
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, RangeParam rangeParam, boolean hasNext,
                    List<Integer> taskIds, List<Integer> attemptIds) {
     this.host = host;
     this.type = type;
     this.executionBlockId = executionBlockId;
     this.partitionId = partitionId;
-    this.rangeParams = rangeParams;
+    this.rangeParam = rangeParam;
     this.hasNext = hasNext;
     this.name = name;
     this.taskIds = taskIds;
@@ -107,7 +151,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
+    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParam.hashCode(),
         hasNext, taskIds, attemptIds, offset, length);
   }
 
@@ -123,11 +167,14 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     builder.setHasNext(hasNext);
     builder.setName(name);
 
-    if (rangeParams != null && !rangeParams.isEmpty()) {
-      builder.setRangeParams(rangeParams);
+    if (rangeParam != null) {
+      builder.setRangeStart(ByteString.copyFrom(rangeParam.getStart()));
+      builder.setRangeEnd(ByteString.copyFrom(rangeParam.getEnd()));
+      builder.setRangeLastInclusive(rangeParam.isLastInclusive());
     }
 
     Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+
     builder.addAllTaskId(taskIds);
     builder.addAllAttemptId(attemptIds);
 
@@ -141,10 +188,6 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.attemptIds.add(attemptId);
   }
 
-  public Task.PullHost getPullHost() {
-    return this.host;
-  }
-
   public ExecutionBlockId getExecutionBlockId() {
     return executionBlockId;
   }
@@ -153,20 +196,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.executionBlockId = executionBlockId;
   }
 
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public void setPartitionId(int partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  public String getRangeParams() {
-    return rangeParams;
-  }
-
-  public void setRangeParams(String rangeParams) {
-    this.rangeParams = rangeParams;
+  public void setRangeParams(RangeParam rangeParams) {
+    this.rangeParam = rangeParams;
   }
 
   public boolean hasNext() {
@@ -185,36 +216,10 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.type = type;
   }
 
-  /**
-   * Get the pull server URIs.
-   */
-  public List<URI> getURIs(){
-    return Repartitioner.createFetchURL(this, true);
-  }
-
-  /**
-   * Get the pull server URIs without repeated parameters.
-   */
-  public List<URI> getSimpleURIs(){
-    return Repartitioner.createFetchURL(this, false);
-  }
-
   public String getName() {
     return name;
   }
 
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public List<Integer> getTaskIds() {
-    return taskIds;
-  }
-
-  public List<Integer> getAttemptIds() {
-    return attemptIds;
-  }
-
   public long getOffset() {
     return offset;
   }
@@ -238,8 +243,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     newFetchImpl.type = type;
     newFetchImpl.executionBlockId = executionBlockId;
     newFetchImpl.partitionId = partitionId;
-    newFetchImpl.name = name;
-    newFetchImpl.rangeParams = rangeParams;
+    newFetchImpl.rangeParam = rangeParam;
     newFetchImpl.hasNext = hasNext;
     if (taskIds != null) {
       newFetchImpl.taskIds = Lists.newArrayList(taskIds);
@@ -269,7 +273,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
         TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
         TUtil.checkEquals(host, fetch.host) &&
         TUtil.checkEquals(name, fetch.name) &&
-        TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+        TUtil.checkEquals(rangeParam, fetch.rangeParam) &&
         TUtil.checkEquals(taskIds, fetch.taskIds) &&
         TUtil.checkEquals(type, fetch.type) &&
         TUtil.checkEquals(offset, fetch.offset) &&

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index b5abffe..250b4cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
 
@@ -42,6 +44,8 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -67,6 +71,7 @@ public class Fetcher {
   private TajoProtos.FetcherState state;
 
   private Bootstrap bootstrap;
+  private List<Long> chunkLengths = new ArrayList<>();
 
   public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
     this.uri = uri;
@@ -97,9 +102,6 @@ public class Fetcher {
               conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
           .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
           .option(ChannelOption.TCP_NODELAY, true);
-
-      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
-      bootstrap.handler(initializer);
     }
   }
 
@@ -123,12 +125,20 @@ public class Fetcher {
     return messageReceiveCount;
   }
 
-  public FileChunk get() throws IOException {
+  public List<FileChunk> get() throws IOException {
+    List<FileChunk> fileChunks = new ArrayList<>();
     if (useLocalFile) {
       startTime = System.currentTimeMillis();
       finishTime = System.currentTimeMillis();
       state = TajoProtos.FetcherState.FETCH_FINISHED;
-      return fileChunk;
+      fileChunks.add(fileChunk);
+      fileLen = fileChunk.getFile().length();
+      return fileChunks;
+    }
+
+    if (state == FetcherState.FETCH_INIT) {
+      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+      bootstrap.handler(initializer);
     }
 
     this.startTime = System.currentTimeMillis();
@@ -136,7 +146,7 @@ public class Fetcher {
     ChannelFuture future = null;
     try {
       future = bootstrap.clone().connect(new InetSocketAddress(host, port))
-              .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
 
       // Wait until the connection attempt succeeds or fails.
       Channel channel = future.awaitUninterruptibly().channel();
@@ -154,7 +164,7 @@ public class Fetcher {
       request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
 
       if(LOG.isDebugEnabled()) {
-        LOG.info("Status: " + getState() + ", URI:" + uri);
+        LOG.debug("Status: " + getState() + ", URI:" + uri);
       }
       // Send the HTTP request.
       channel.writeAndFlush(request);
@@ -163,7 +173,18 @@ public class Fetcher {
       channel.closeFuture().syncUninterruptibly();
 
       fileChunk.setLength(fileChunk.getFile().length());
-      return fileChunk;
+
+      long start = 0;
+      for (Long eachChunkLength : chunkLengths) {
+        if (eachChunkLength == 0) continue;
+        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+        chunk.setEbId(fileChunk.getEbId());
+        chunk.setFromRemote(fileChunk.fromRemote());
+        fileChunks.add(chunk);
+        start += eachChunkLength;
+      }
+      return fileChunks;
+
     } finally {
       if(future != null && future.channel().isOpen()){
         // Close the channel to exit.
@@ -226,6 +247,13 @@ public class Fetcher {
                 }
               }
             }
+            if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
+              String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
+
+              for (String eachSplit : stringOffset.split(",")) {
+                chunkLengths.add(Long.parseLong(eachSplit));
+              }
+            }
           }
           if (LOG.isDebugEnabled()) {
             LOG.debug(sb.toString());
@@ -296,6 +324,7 @@ public class Fetcher {
       if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
         //channel is closed, but cannot complete fetcher
         finishTime = System.currentTimeMillis();
+        LOG.error("Channel closed by peer: " + ctx.channel());
         state = TajoProtos.FetcherState.FETCH_FAILED;
       }
       IOUtils.cleanup(LOG, fc, raf);