You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/12/24 10:18:25 UTC

[2/2] tajo git commit: TAJO-1950: Query master uses too much memory during range shuffle.

TAJO-1950: Query master uses too much memory during range shuffle.

Closes #884


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f9ae1da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f9ae1da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f9ae1da

Branch: refs/heads/master
Commit: 1f9ae1da0424731567cea18e975c47d4479b0ae9
Parents: e8ee7f2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 24 18:17:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 24 18:17:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |   4 +-
 .../apache/tajo/master/TestRepartitioner.java   |  81 ++---
 .../org/apache/tajo/worker/TestFetcher.java     |   2 +-
 .../apache/tajo/engine/query/TaskRequest.java   |   6 +-
 .../tajo/engine/query/TaskRequestImpl.java      |  13 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java |  32 --
 .../tajo/querymaster/DefaultTaskScheduler.java  |  13 +-
 .../tajo/querymaster/FetchScheduleEvent.java    |   7 +-
 .../apache/tajo/querymaster/Repartitioner.java  | 206 ++++++++-----
 .../java/org/apache/tajo/querymaster/Stage.java |   3 +-
 .../java/org/apache/tajo/querymaster/Task.java  |  48 ++-
 .../tajo/worker/ExecutionBlockContext.java      |  68 ++++-
 .../java/org/apache/tajo/worker/FetchImpl.java  | 140 ++++-----
 .../java/org/apache/tajo/worker/Fetcher.java    |  45 ++-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 174 ++++++-----
 tajo-core/src/main/proto/ResourceProtos.proto   |  14 +-
 .../src/main/resources/webapps/worker/task.jsp  |  17 +-
 .../tajo/pullserver/HttpDataServerHandler.java  |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  | 302 ++++++++++++++-----
 .../apache/tajo/storage/index/bst/BSTIndex.java |  54 +++-
 22 files changed, 811 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2edf9d7..c02bfc7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
+
     TAJO-1858: Aligning error message in execute query page of web UI is needed.
     (Byunghwa Yun via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9ab3dfa..29cf9ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -212,6 +212,10 @@ public class TajoConf extends Configuration {
 
     // Shuffle Configuration --------------------------------------------------
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
+    PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
+    PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+    PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
+        Validators.min("1")),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index 279fce7..81eeb1f 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -88,7 +88,9 @@ public class TestHAServiceHDFSImpl  {
       assertEquals(2, fs.listStatus(activePath).length);
       assertEquals(0, fs.listStatus(backupPath).length);
     } finally {
-      backupMaster.stop();
+      if (backupMaster != null) {
+        backupMaster.stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index b0a3a17..c260ab6 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -29,13 +29,15 @@ import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.junit.Test;
 
 import java.net.URI;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import static junit.framework.Assert.assertEquals;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -70,11 +72,9 @@ public class TestRepartitioner {
             new HashMap<>();
 
     for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
-      FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
+      FetchImpl fetch = new FetchImpl(sid.toString(), new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
           sid, eachEntry.getKey(), eachEntry.getValue());
 
-      fetch.setName(sid.toString());
-
       FetchProto proto = fetch.getProto();
       fetch = new FetchImpl(proto);
       assertEquals(proto, fetch.getProto());
@@ -84,7 +84,7 @@ public class TestRepartitioner {
 
       hashEntries.put(eachEntry.getKey(), ebEntries);
 
-      List<URI> uris = fetch.getURIs();
+      List<URI> uris = Repartitioner.createFullURIs(2 * StorageUnit.KB, fetch.getProto());
       assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only one URI per partition.
 
       URI uri = uris.get(0);
@@ -119,7 +119,7 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     FetchImpl [] fetches = new FetchImpl[12];
     for (int i = 0; i < 12; i++) {
-      fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
+      fetches[i] = new FetchImpl(tableName, new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
     }
 
     int [] VOLUMES = {100, 80, 70, 30, 10, 5};
@@ -128,37 +128,40 @@ public class TestRepartitioner {
       fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
     }
 
-    Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+    FetchProto[] expectedProtos = new FetchProto[fetches.length];
+    expectedProtos = Arrays.stream(fetches).map(fetch -> fetch.getProto()).collect(Collectors.toList())
+        .toArray(expectedProtos);
+    Pair<Long [], Map<String, List<FetchProto>>[]> results;
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
     long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
     assertFetchVolumes(expected, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
     long expected0 [] = {140, 155};
     assertFetchVolumes(expected0, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
     long expected1 [] = {100, 95, 100};
     assertFetchVolumes(expected1, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
     long expected2 [] = {100, 80, 70, 45};
     assertFetchVolumes(expected2, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
     long expected3 [] = {100, 80, 70, 30, 15};
     assertFetchVolumes(expected3, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
     long expected4 [] = {100, 80, 70, 30, 10, 5};
     assertFetchVolumes(expected4, results.getFirst());
-    assertFetchImpl(fetches, results.getSecond());
+    assertFetchProto(expectedProtos, results.getSecond());
   }
 
   private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -191,7 +194,8 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(6, fetches.size());
 
@@ -199,10 +203,10 @@ public class TestRepartitioner {
     int index = 0;
     int numZeroPosFetcher = 0;
     long totalLength = 0;
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       totalInterms += eachFetchList.size();
       long eachFetchVolume = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         eachFetchVolume += eachFetch.getLength();
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
@@ -248,8 +252,9 @@ public class TestRepartitioner {
       intermediateEntries.add(interm);
     }
 
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(32, fetches.size());
 
@@ -258,15 +263,15 @@ public class TestRepartitioner {
     long totalLength = 0;
     Set<String> uniqPullHost = new HashSet<>();
 
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       long length = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
         }
         totalLength += eachFetch.getLength();
         length += eachFetch.getLength();
-        uniqPullHost.add(eachFetch.getPullHost().toString());
+        uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
       }
       assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
       if (index < fetches.size() - 1) {
@@ -378,7 +383,8 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 256 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, entries, splitVolume,
         10 * 1024 * 1024);
 
 
@@ -393,13 +399,13 @@ public class TestRepartitioner {
         {728355084,121760359},
     };
     int index = 0;
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       if (index == 3) {
         assertEquals(2, eachFetchList.size());
       } else {
         assertEquals(1, eachFetchList.size());
       }
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         assertEquals(expected[index][0], eachFetch.getOffset());
         assertEquals(expected[index][1], eachFetch.getLength());
         index++;
@@ -438,13 +444,14 @@ public class TestRepartitioner {
     }
 
     long splitVolume = 128 * 1024 * 1024;
-    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
         splitVolume, 10 * 1024 * 1024);
     assertEquals(32, fetches.size());
 
     int expectedSize = 0;
-    Set<FetchImpl> fetchSet = new HashSet<>();
-    for(List<FetchImpl> list : fetches){
+    Set<FetchProto> fetchSet = new HashSet<>();
+    for(List<FetchProto> list : fetches){
       expectedSize += list.size();
       fetchSet.addAll(list);
     }
@@ -456,15 +463,15 @@ public class TestRepartitioner {
     long totalLength = 0;
     Set<String> uniqPullHost = new HashSet<>();
 
-    for (List<FetchImpl> eachFetchList: fetches) {
+    for (List<FetchProto> eachFetchList: fetches) {
       long length = 0;
-      for (FetchImpl eachFetch: eachFetchList) {
+      for (FetchProto eachFetch: eachFetchList) {
         if (eachFetch.getOffset() == 0) {
           numZeroPosFetcher++;
         }
         totalLength += eachFetch.getLength();
         length += eachFetch.getLength();
-        uniqPullHost.add(eachFetch.getPullHost().toString());
+        uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
       }
       assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
       if (index < fetches.size() - 1) {
@@ -482,25 +489,25 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     Task.PullHost pullHost = new Task.PullHost("localhost", 0);
 
-    FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
-    FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl expected = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl fetch2 = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
     assertEquals(expected, fetch2);
     fetch2.setOffset(5);
     fetch2.setLength(10);
     assertNotEquals(expected, fetch2);
   }
 
-  private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
-    Set<FetchImpl> expectedURLs = Sets.newHashSet();
+  private static void assertFetchProto(FetchProto [] expected, Map<String, List<FetchProto>>[] result) {
+    Set<FetchProto> expectedURLs = Sets.newHashSet();
 
-    for (FetchImpl f : expected) {
+    for (FetchProto f : expected) {
       expectedURLs.add(f);
     }
 
-    Set<FetchImpl> resultURLs = Sets.newHashSet();
+    Set<FetchProto> resultURLs = Sets.newHashSet();
 
-    for (Map<String, List<FetchImpl>> e : result) {
-      for (List<FetchImpl> list : e.values()) {
+    for (Map<String, List<FetchProto>> e : result) {
+      for (List<FetchProto> list : e.values()) {
         resultURLs.addAll(list);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
index a91fc30..dfc37b0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -90,7 +90,7 @@ public class TestFetcher {
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
     final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    FileChunk chunk = fetcher.get();
+    FileChunk chunk = fetcher.get().get(0);
     assertNotNull(chunk);
     assertNotNull(chunk.getFile());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index 48d4780..ef4ff60 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -21,6 +21,7 @@
  */
 package org.apache.tajo.engine.query;
 
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -29,7 +30,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.util.List;
 
@@ -39,8 +39,8 @@ public interface TaskRequest extends ProtoObject<TaskRequestProto> {
 	List<CatalogProtos.FragmentProto> getFragments();
 	PlanProto.LogicalNodeTree getPlan();
 	void setInterQuery();
-	void addFetch(String name, FetchImpl fetch);
-	List<FetchImpl> getFetches();
+	void addFetch(FetchProto fetch);
+	List<FetchProto> getFetches();
   QueryContext getQueryContext(TajoConf conf);
   DataChannel getDataChannel();
   Enforcer getEnforcer();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index 7b52dab..fc7556c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -42,7 +42,7 @@ public class TaskRequestImpl implements TaskRequest {
 	private boolean clusteredOutput;
 	private PlanProto.LogicalNodeTree plan;     // logical node
 	private Boolean interQuery;
-	private List<FetchImpl> fetches;
+	private List<FetchProto> fetches;
 	private QueryContext queryContext;
 	private DataChannel dataChannel;
 	private Enforcer enforcer;
@@ -157,10 +157,10 @@ public class TaskRequestImpl implements TaskRequest {
 	  this.interQuery = true;
 	}
 
-  public void addFetch(String name, FetchImpl fetch) {
+  @Override
+  public void addFetch(FetchProto fetch) {
     maybeInitBuilder();
     initFetches();
-    fetch.setName(name);
     fetches.add(fetch);
   }
 
@@ -212,7 +212,8 @@ public class TaskRequestImpl implements TaskRequest {
     return this.enforcer;
   }
 
-  public List<FetchImpl> getFetches() {
+  @Override
+  public List<FetchProto> getFetches() {
 	  initFetches();    
 
     return this.fetches;
@@ -225,7 +226,7 @@ public class TaskRequestImpl implements TaskRequest {
     TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
     this.fetches = new ArrayList<>();
     for(FetchProto fetch : p.getFetchesList()) {
-      fetches.add(new FetchImpl(fetch));
+      fetches.add(fetch);
     }
 	}
 
@@ -259,7 +260,7 @@ public class TaskRequestImpl implements TaskRequest {
 		}
     if (this.fetches != null) {
       for (int i = 0; i < fetches.size(); i++) {
-        builder.addFetches(fetches.get(i).getProto());
+        builder.addFetches(fetches.get(i));
       }
     }
     if (this.queryMasterHostAndPort != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index bae04c8..31c23f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.utils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
@@ -29,47 +28,16 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 
 public class TupleUtil {
   private static final Log LOG = LogFactory.getLog(TupleUtil.class);
 
-  public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
-      throws UnsupportedEncodingException {
-    return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema));
-  }
-
-  public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
-      throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-    byte [] firstKeyBytes = encoder.toBytes(range.getStart());
-    byte [] endKeyBytes = encoder.toBytes(range.getEnd());
-
-    String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
-    String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
-
-    sb.append("start=")
-        .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
-        .append("&")
-        .append("end=")
-        .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
-
-    if (last) {
-      sb.append("&final=true");
-    }
-
-    return sb.toString();
-  }
-
   /**
    * if max value is null, set ranges[last]
    * @param sortSpecs

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index f1c0f62..26dc103 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -52,7 +52,6 @@ import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -233,11 +232,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+        Map<String, List<FetchProto>> fetches = castEvent.getFetches();
         TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
         Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
         scheduledObjectNum++;
-        for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+        for (Entry<String, List<FetchProto>> eachFetch : fetches.entrySet()) {
           task.addFetches(eachFetch.getKey(), eachFetch.getValue());
           task.addFragment(fragmentsForNonLeafTask[0], true);
           if (fragmentsForNonLeafTask[1] != null) {
@@ -983,11 +982,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
-          for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
-            Collection<FetchImpl> fetches = entry.getValue();
+          for(Map.Entry<String, Set<FetchProto>> entry: task.getFetchMap().entrySet()) {
+            Collection<FetchProto> fetches = entry.getValue();
             if (fetches != null) {
-              for (FetchImpl fetch : fetches) {
-                taskAssign.addFetch(entry.getKey(), fetch);
+              for (FetchProto fetch : fetches) {
+                taskAssign.addFetch(fetch);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
index 5fe2f80..e4e63b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.querymaster;
 
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -26,15 +27,15 @@ import java.util.List;
 import java.util.Map;
 
 public class FetchScheduleEvent extends TaskSchedulerEvent {
-  private final Map<String, List<FetchImpl>> fetches;
+  private final Map<String, List<FetchProto>> fetches;  // map of table name and fetch list
 
   public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
-                            final Map<String, List<FetchImpl>> fetches) {
+                            final Map<String, List<FetchProto>> fetches) {
     super(eventType, blockId);
     this.fetches = fetches;
   }
 
-  public Map<String, List<FetchImpl>> getFetches() {
+  public Map<String, List<FetchProto>> getFetches() {
     return fetches;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index e64cd51..bd8311f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -25,8 +25,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -48,7 +50,9 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
@@ -57,13 +61,17 @@ import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
 import java.net.URI;
+import java.net.URLEncoder;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -75,7 +83,6 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
 public class Repartitioner {
   private static final Log LOG = LogFactory.getLog(Repartitioner.class);
 
-  private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
   private final static String UNKNOWN_HOST = "unknown";
 
   public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
@@ -546,12 +553,13 @@ public class Repartitioner {
 
   private static void addJoinShuffle(Stage stage, int partitionId,
                                      Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
-    Map<String, List<FetchImpl>> fetches = new HashMap<>();
+    Map<String, List<FetchProto>> fetches = new HashMap<>();
     for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
       if (grouppedPartitions.containsKey(execBlock.getId())) {
-        Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
+        String name = execBlock.getId().toString();
+        List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE,
             grouppedPartitions.get(execBlock.getId()));
-        fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
+        fetches.put(name, requests);
       }
     }
 
@@ -568,9 +576,10 @@ public class Repartitioner {
    *
    * @return key: pullserver's address, value: a list of requests
    */
-  private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
-                                                          ShuffleType type,
-                                                          List<IntermediateEntry> partitions) {
+  private static List<FetchProto> mergeShuffleRequest(final String fetchName,
+                                                      final int partitionId,
+                                                      final ShuffleType type,
+                                                      final List<IntermediateEntry> partitions) {
     // ebId + pullhost -> FetchImmpl
     Map<String, FetchImpl> mergedPartitions = new HashMap<>();
 
@@ -582,12 +591,15 @@ public class Repartitioner {
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
       } else {
         // In some cases like union each IntermediateEntry has different EBID.
-        FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId);
+        FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId);
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
         mergedPartitions.put(mergedKey, fetch);
       }
     }
-    return mergedPartitions.values();
+
+    return mergedPartitions.values().stream()
+        .map(fetch -> fetch.getProto())
+        .collect(Collectors.toList());
   }
 
   public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -699,44 +711,44 @@ public class Repartitioner {
         new String[]{UNKNOWN_HOST});
     Stage.scheduleFragment(stage, dummyFragment);
 
-    List<FetchImpl> fetches = new ArrayList<>();
+    Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>();
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
     for (ExecutionBlock childBlock : childBlocks) {
       Stage childExecSM = stage.getContext().getStage(childBlock.getId());
       for (Task qu : childExecSM.getTasks()) {
         for (IntermediateEntry p : qu.getIntermediateData()) {
-          FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
-          fetch.addPart(p.getTaskId(), p.getAttemptId());
-          fetches.add(fetch);
+          Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId());
+          if (fetches.containsKey(key)) {
+            fetches.get(key).addPart(p.getTaskId(), p.getAttemptId());
+          } else {
+            FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+            fetch.addPart(p.getTaskId(), p.getAttemptId());
+            fetches.put(key, fetch);
+          }
         }
       }
     }
 
-    SortedMap<TupleRange, Collection<FetchImpl>> map;
+    SortedMap<TupleRange, Collection<FetchProto>> map;
     map = new TreeMap<>();
 
-    Set<FetchImpl> fetchSet;
-    try {
-      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
-      for (int i = 0; i < ranges.length; i++) {
-        fetchSet = new HashSet<>();
-        for (FetchImpl fetch: fetches) {
-          String rangeParam =
-              TupleUtil.rangeToQuery(ranges[i], i == (ranges.length - 1) , encoder);
-          FetchImpl copy = null;
-          try {
-            copy = fetch.clone();
-          } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
-          }
-          copy.setRangeParams(rangeParam);
-          fetchSet.add(copy);
+    Set<FetchProto> fetchSet;
+    RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
+    for (int i = 0; i < ranges.length; i++) {
+      fetchSet = new HashSet<>();
+      RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder);
+      for (FetchImpl fetch : fetches.values()) {
+        FetchImpl copy = null;
+        try {
+          copy = fetch.clone();
+        } catch (CloneNotSupportedException e) {
+          throw new RuntimeException(e);
         }
-        map.put(ranges[i], fetchSet);
+        copy.setRangeParams(rangeParam);
+        fetchSet.add(copy.getProto());
       }
 
-    } catch (UnsupportedEncodingException e) {
-      LOG.error(e);
+      map.put(ranges[i], fetchSet);
     }
 
     scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
@@ -744,20 +756,20 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
-                                                   String tableName, int num) {
+  public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions,
+                                                 String tableName, int num) {
     int i;
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
     for (i = 0; i < num; i++) {
       fetchesArray[i] = new HashMap<>();
     }
     i = 0;
-    for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
-      Collection<FetchImpl> value = entry.getValue();
+    for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) {
+      Collection<FetchProto> value = entry.getValue();
       TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
       if (i == num) i = 0;
     }
-    for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
+    for (Map<String, List<FetchProto>> eachFetches : fetchesArray) {
       Stage.scheduleFetches(stage, eachFetches);
     }
   }
@@ -785,6 +797,10 @@ public class Repartitioner {
       return totalVolume;
     }
 
+    public List<FetchProto> getFetchProtos() {
+      return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList());
+    }
+
   }
 
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
@@ -821,7 +837,7 @@ public class Repartitioner {
         Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
         for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
 
-          FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+          FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(),
               block.getId(), interm.getKey(), e.getValue());
 
           long volumeSum = 0;
@@ -891,20 +907,16 @@ public class Repartitioner {
     }
   }
 
-  public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
+  public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl(
       Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
 
     // Sort fetchGroupMeta in a descending order of data volumes.
     List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
-    Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
-      @Override
-      public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
-        return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0);
-      }
-    });
+    Collections.sort(fetchGroupMetaList, (o1, o2) ->
+        o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0));
 
     // Initialize containers
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
     Long [] assignedVolumes = new Long[num];
     // initialization
     for (int i = 0; i < num; i++) {
@@ -925,7 +937,7 @@ public class Repartitioner {
         FetchGroupMeta fetchGroupMeta = iterator.next();
         assignedVolumes[p] += fetchGroupMeta.getVolume();
 
-        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
         p++;
       }
 
@@ -933,13 +945,13 @@ public class Repartitioner {
       while (p >= 0 && iterator.hasNext()) {
         FetchGroupMeta fetchGroupMeta = iterator.next();
         assignedVolumes[p] += fetchGroupMeta.getVolume();
-        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
 
         // While the current one is smaller than next one, it adds additional fetches to current one.
         while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) {
           FetchGroupMeta additionalFetchGroup = iterator.next();
           assignedVolumes[p] += additionalFetchGroup.getVolume();
-          TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
+          TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos());
         }
 
         p--;
@@ -951,9 +963,9 @@ public class Repartitioner {
 
   public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
                                                              String tableName, int num) {
-    Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
+    Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
     // Schedule FetchImpls
-    for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
+    for (Map<String, List<FetchProto>> eachFetches : fetchsArray) {
       Stage.scheduleFetches(stage, eachFetches);
     }
   }
@@ -976,7 +988,7 @@ public class Repartitioner {
       throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
           "tajo.shuffle.hash.appender.page.volumn-mb");
     }
-    List<List<FetchImpl>> fetches = new ArrayList<>();
+    List<List<FetchProto>> fetches = new ArrayList<>();
 
     long totalIntermediateSize = 0L;
     for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) {
@@ -996,7 +1008,7 @@ public class Repartitioner {
 
       // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
       for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) {
-        List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
+        List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries,
             splitVolume, pageSize);
         if (eachFetches != null && !eachFetches.isEmpty()) {
           fetches.addAll(eachFetches);
@@ -1007,8 +1019,8 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(fetches.size());
 
     int i = 0;
-    Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
-    for(List<FetchImpl> entry : fetches) {
+    Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()];
+    for(List<FetchProto> entry : fetches) {
       fetchesArray[i] = new HashMap<>();
       fetchesArray[i].put(tableName, entry);
 
@@ -1030,16 +1042,16 @@ public class Repartitioner {
    * @param splitVolume
    * @return
    */
-  public static List<List<FetchImpl>> splitOrMergeIntermediates(
+  public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull  String fetchName,
       ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
     // Each List<FetchImpl> has splitVolume size.
-    List<List<FetchImpl>> fetches = new ArrayList<>();
+    List<List<FetchProto>> fetches = new ArrayList<>();
 
     Iterator<IntermediateEntry> iter = entries.iterator();
     if (!iter.hasNext()) {
       return null;
     }
-    List<FetchImpl> fetchListForSingleTask = new ArrayList<>();
+    List<FetchProto> fetchListForSingleTask = new ArrayList<>();
     long fetchListVolume = 0;
 
     while (iter.hasNext()) {
@@ -1065,11 +1077,11 @@ public class Repartitioner {
           fetchListForSingleTask = new ArrayList<>();
           fetchListVolume = 0;
         }
-        FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+        FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
             ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
         fetch.setOffset(eachSplit.getFirst());
         fetch.setLength(eachSplit.getSecond());
-        fetchListForSingleTask.add(fetch);
+        fetchListForSingleTask.add(fetch.getProto());
         fetchListVolume += eachSplit.getSecond();
       }
     }
@@ -1079,19 +1091,56 @@ public class Repartitioner {
     return fetches;
   }
 
-  public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
+  /**
+   * Get the pull server URIs.
+   */
+  public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) {
+    return createFetchURL(maxUrlLength, fetch, true);
+  }
+
+  /**
+   * Get the pull server URIs without repeated parameters.
+   */
+  public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) {
+    return createFetchURL(maxUrlLength, fetch, false);
+  }
+
+  private static String getRangeParam(FetchProto proto) {
+    StringBuilder sb = new StringBuilder();
+    String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray()));
+    String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray()));
+
+    try {
+      sb.append("start=")
+          .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
+          .append("&")
+          .append("end=")
+          .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (proto.getRangeLastInclusive()) {
+      sb.append("&final=true");
+    }
+
+    return sb.toString();
+  }
+
+  public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
     String scheme = "http://";
 
     StringBuilder urlPrefix = new StringBuilder(scheme);
-    urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
-        .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
-        .append("&sid=").append(fetch.getExecutionBlockId().getId())
+    ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
+    urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
+        .append("qid=").append(ebId.getQueryId().toString())
+        .append("&sid=").append(ebId.getId())
         .append("&p=").append(fetch.getPartitionId())
         .append("&type=");
     if (fetch.getType() == HASH_SHUFFLE) {
       urlPrefix.append("h");
     } else if (fetch.getType() == RANGE_SHUFFLE) {
-      urlPrefix.append("r").append("&").append(fetch.getRangeParams());
+      urlPrefix.append("r").append("&").append(getRangeParam(fetch));
     } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
       urlPrefix.append("s");
     }
@@ -1105,17 +1154,26 @@ public class Repartitioner {
       if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
         fetchURLs.add(URI.create(urlPrefix.toString()));
       } else {
+        urlPrefix.append("&ta=");
         // If the get request is longer than 2000 characters,
         // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
         // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
         // The below code transforms a long request to multiple requests.
         List<String> taskIdsParams = new ArrayList<>();
         StringBuilder taskIdListBuilder = new StringBuilder();
-        List<Integer> taskIds = fetch.getTaskIds();
-        List<Integer> attemptIds = fetch.getAttemptIds();
+        
+        final List<Integer> taskIds = fetch.getTaskIdList();
+        final List<Integer> attemptIds = fetch.getAttemptIdList();
+
+        // Sort task ids to increase cache hit in pull server
+        final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
+            .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+            .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+            .collect(Collectors.toList());
+
         boolean first = true;
 
-        for (int i = 0; i < taskIds.size(); i++) {
+        for (int i = 0; i < taskAndAttemptIds.size(); i++) {
           StringBuilder taskAttemptId = new StringBuilder();
 
           if (!first) { // when comma is added?
@@ -1124,17 +1182,16 @@ public class Repartitioner {
             first = false;
           }
 
-          int taskId = taskIds.get(i);
+          int taskId = taskAndAttemptIds.get(i).getFirst();
           if (taskId < 0) {
             // In the case of hash shuffle each partition has single shuffle file per worker.
             // TODO If file is large, consider multiple fetching(shuffle file can be split)
             continue;
           }
-          int attemptId = attemptIds.get(i);
+          int attemptId = taskAndAttemptIds.get(i).getSecond();
           taskAttemptId.append(taskId).append("_").append(attemptId);
 
-          if (taskIdListBuilder.length() + taskAttemptId.length()
-              > HTTP_REQUEST_MAXIMUM_LENGTH) {
+          if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
             taskIdsParams.add(taskIdListBuilder.toString());
             taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
           } else {
@@ -1145,7 +1202,6 @@ public class Repartitioner {
         if (taskIdListBuilder.length() > 0) {
           taskIdsParams.add(taskIdListBuilder.toString());
         }
-        urlPrefix.append("&ta=");
         for (String param : taskIdsParams) {
           fetchURLs.add(URI.create(urlPrefix + param));
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 254df64..5f050bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -65,7 +65,6 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -1189,7 +1188,7 @@ public class Stage implements EventHandler<StageEvent> {
         stage.getId(), leftFragment, rightFragments));
   }
 
-  public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+  public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) {
     stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
         stage.getId(), fetches));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 95a7170..9d038ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
@@ -46,7 +47,6 @@ import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
 
 import java.net.URI;
 import java.util.*;
@@ -55,8 +55,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.ResourceProtos.*;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
 public class Task implements EventHandler<TaskEvent> {
   /** Class Logger */
@@ -70,7 +70,7 @@ public class Task implements EventHandler<TaskEvent> {
 	private List<ScanNode> scan;
 	
 	private Map<String, Set<FragmentProto>> fragMap;
-	private Map<String, Set<FetchImpl>> fetchMap;
+	private Map<String, Set<FetchProto>> fetchMap;
 
   private int totalFragmentNum;
 
@@ -100,6 +100,8 @@ public class Task implements EventHandler<TaskEvent> {
 
   private TaskHistory finalTaskHistory;
 
+  private final int maxUrlLength;
+
   protected static final StateMachineFactory
       <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
       new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
@@ -207,6 +209,8 @@ public class Task implements EventHandler<TaskEvent> {
 
     stateMachine = stateMachineFactory.make(this);
     totalFragmentNum = 0;
+    maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+        ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
 	}
 
   public boolean isLeafTask() {
@@ -282,9 +286,9 @@ public class Task implements EventHandler<TaskEvent> {
     taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
 
     List<String[]> fetchList = new ArrayList<>();
-    for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
-      for (FetchImpl f : e.getValue()) {
-        for (URI uri : f.getSimpleURIs()){
+    for (Map.Entry<String, Set<FetchProto>> e : getFetchMap().entrySet()) {
+      for (FetchProto f : e.getValue()) {
+        for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)) {
           fetchList.add(new String[] {e.getKey(), uri.toString()});
         }
       }
@@ -364,8 +368,8 @@ public class Task implements EventHandler<TaskEvent> {
     return succeededWorker;
   }
 	
-	public void addFetches(String tableId, Collection<FetchImpl> fetches) {
-	  Set<FetchImpl> fetchSet;
+	public void addFetches(String tableId, Collection<FetchProto> fetches) {
+	  Set<FetchProto> fetchSet;
     if (fetchMap.containsKey(tableId)) {
       fetchSet = fetchMap.get(tableId);
     } else {
@@ -375,7 +379,7 @@ public class Task implements EventHandler<TaskEvent> {
     fetchMap.put(tableId, fetchSet);
 	}
 	
-	public void setFetches(Map<String, Set<FetchImpl>> fetches) {
+	public void setFetches(Map<String, Set<FetchProto>> fetches) {
 	  this.fetchMap.clear();
 	  this.fetchMap.putAll(fetches);
 	}
@@ -395,27 +399,15 @@ public class Task implements EventHandler<TaskEvent> {
 	public TaskId getId() {
 		return taskId;
 	}
-	
-	public Collection<FetchImpl> getFetchHosts(String tableId) {
-	  return fetchMap.get(tableId);
-	}
-	
-	public Collection<Set<FetchImpl>> getFetches() {
+
+	public Collection<Set<FetchProto>> getFetches() {
 	  return fetchMap.values();
 	}
 
-  public Map<String, Set<FetchImpl>> getFetchMap() {
+  public Map<String, Set<FetchProto>> getFetchMap() {
     return fetchMap;
   }
-	
-	public Collection<FetchImpl> getFetch(ScanNode scan) {
-	  return this.fetchMap.get(scan.getTableName());
-	}
-	
-	public ScanNode[] getScanNodes() {
-	  return this.scan.toArray(new ScanNode[scan.size()]);
-	}
-	
+
 	@Override
 	public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -426,10 +418,10 @@ public class Task implements EventHandler<TaskEvent> {
         builder.append(fragment).append(", ");
       }
 		}
-		for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+		for (Entry<String, Set<FetchProto>> e : fetchMap.entrySet()) {
       builder.append(e.getKey()).append(" : ");
-      for (FetchImpl t : e.getValue()) {
-        for (URI uri : t.getURIs()){
+      for (FetchProto t : e.getValue()) {
+        for (URI uri : Repartitioner.createFullURIs(maxUrlLength, t)){
           builder.append(uri).append(" ");
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 65cb6ac..098567a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,6 +20,12 @@ package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,20 +38,20 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.AsyncRpcClient;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -191,10 +197,64 @@ public class ExecutionBlockContext {
     }
     tasks.clear();
     taskHistories.clear();
+
+    // Clear index cache in pull server
+    clearIndexCache();
+
     resource.release();
     RpcClientManager.cleanup(queryMasterClient);
   }
 
+  /**
+   * Send a request to {@link TajoPullServerService} to clear index cache
+   */
+  private void clearIndexCache() {
+    // Avoid unnecessary cache clear request when the current eb is a leaf eb
+    if (executionBlockId.getId() > 1) {
+      Bootstrap bootstrap = new Bootstrap()
+          .group(NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, 1))
+          .channel(NioSocketChannel.class)
+          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+          .option(ChannelOption.TCP_NODELAY, true);
+      ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
+        @Override
+        protected void initChannel(Channel channel) throws Exception {
+          ChannelPipeline pipeline = channel.pipeline();
+          pipeline.addLast("codec", new HttpClientCodec());
+        }
+      };
+      bootstrap.handler(initializer);
+
+      WorkerConnectionInfo connInfo = workerContext.getConnectionInfo();
+      ChannelFuture future = bootstrap.connect(new InetSocketAddress(connInfo.getHost(), connInfo.getPullServerPort()))
+          .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+      try {
+        Channel channel = future.await().channel();
+        if (!future.isSuccess()) {
+          // Upon failure to connect to pull server, cache clear message is just ignored.
+          LOG.warn(future.cause());
+          return;
+        }
+
+        // Example of URI: /ebid=eb_1450063997899_0015_000002
+        ExecutionBlockId clearEbId = new ExecutionBlockId(executionBlockId.getQueryId(), executionBlockId.getId() - 1);
+        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, "ebid=" + clearEbId.toString());
+        request.headers().set(Names.HOST, connInfo.getHost());
+        request.headers().set(Names.CONNECTION, Values.CLOSE);
+        channel.writeAndFlush(request);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (future != null && future.channel().isOpen()) {
+          // Close the channel to exit.
+          future.channel().close();
+        }
+      }
+    }
+  }
+
   public TajoConf getConf() {
     return systemConf;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 7d2033c..b49d449 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -21,16 +21,21 @@ package org.apache.tajo.worker;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 
-import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 
@@ -42,8 +47,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
   private ShuffleType type; // hash or range partition method.
   private ExecutionBlockId executionBlockId;   // The executionBlock id
   private int partitionId;                     // The hash partition id
-  private String name;                         // The intermediate source name
-  private String rangeParams;                  // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+  private final String name;                   // The intermediate source name
+  private RangeParam rangeParam;               // optional, range parameter for range shuffle
   private boolean hasNext = false;             // optional, if true, has more taskIds
 
   private List<Integer> taskIds;               // repeated, the task ids
@@ -52,19 +57,48 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
   private long offset = -1;
   private long length = -1;
 
-  public FetchImpl() {
-    taskIds = new ArrayList<>();
-    attemptIds = new ArrayList<>();
+  public static class RangeParam {
+    private byte[] start;
+    private byte[] end;
+    private boolean lastInclusive;
+
+    public RangeParam(TupleRange range, boolean lastInclusive, RowStoreEncoder encoder) {
+      this.start = encoder.toBytes(range.getStart());
+      this.end = encoder.toBytes(range.getEnd());
+      this.lastInclusive = lastInclusive;
+    }
+
+    public RangeParam(byte[] start, byte[] end, boolean lastInclusive) {
+      this.start = start;
+      this.end = end;
+      this.lastInclusive = lastInclusive;
+    }
+
+    public byte[] getStart() {
+      return start;
+    }
+
+    public byte[] getEnd() {
+      return end;
+    }
+
+    public boolean isLastInclusive() {
+      return lastInclusive;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(start), Arrays.hashCode(end), lastInclusive);
+    }
   }
 
   public FetchImpl(FetchProto proto) {
-    this(new Task.PullHost(proto.getHost(), proto.getPort()),
+    this(proto.getName(),
+        new Task.PullHost(proto.getHost(), proto.getPort()),
         proto.getType(),
         new ExecutionBlockId(proto.getExecutionBlockId()),
         proto.getPartitionId(),
-        proto.getRangeParams(),
         proto.getHasNext(),
-        proto.getName(),
         proto.getTaskIdList(), proto.getAttemptIdList());
 
     if (proto.hasOffset()) {
@@ -74,31 +108,41 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     if (proto.hasLength()) {
       this.length = proto.getLength();
     }
+
+    if (proto.hasRangeStart()) {
+      this.rangeParam = new RangeParam(proto.getRangeStart().toByteArray(),
+          proto.getRangeEnd().toByteArray(), proto.getRangeLastInclusive());
+    }
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                    int partitionId) {
-    this(host, type, executionBlockId, partitionId, null, false, null,
-            new ArrayList<>(), new ArrayList<>());
+    this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>());
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                    int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
-    this(host, type, executionBlockId, partitionId, null, false, null,
+    this(name, host, type, executionBlockId, partitionId, null, false,
             new ArrayList<>(), new ArrayList<>());
     for (Task.IntermediateEntry entry : intermediateEntryList){
       addPart(entry.getTaskId(), entry.getAttemptId());
     }
   }
 
-  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
-                   int partitionId, String rangeParams, boolean hasNext, String name,
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, boolean hasNext,
+                   List<Integer> taskIds, List<Integer> attemptIds) {
+    this(name, host, type, executionBlockId, partitionId, null, hasNext, taskIds, attemptIds);
+  }
+
+  public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, RangeParam rangeParam, boolean hasNext,
                    List<Integer> taskIds, List<Integer> attemptIds) {
     this.host = host;
     this.type = type;
     this.executionBlockId = executionBlockId;
     this.partitionId = partitionId;
-    this.rangeParams = rangeParams;
+    this.rangeParam = rangeParam;
     this.hasNext = hasNext;
     this.name = name;
     this.taskIds = taskIds;
@@ -107,7 +151,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
+    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParam.hashCode(),
         hasNext, taskIds, attemptIds, offset, length);
   }
 
@@ -123,11 +167,14 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     builder.setHasNext(hasNext);
     builder.setName(name);
 
-    if (rangeParams != null && !rangeParams.isEmpty()) {
-      builder.setRangeParams(rangeParams);
+    if (rangeParam != null) {
+      builder.setRangeStart(ByteString.copyFrom(rangeParam.getStart()));
+      builder.setRangeEnd(ByteString.copyFrom(rangeParam.getEnd()));
+      builder.setRangeLastInclusive(rangeParam.isLastInclusive());
     }
 
     Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+
     builder.addAllTaskId(taskIds);
     builder.addAllAttemptId(attemptIds);
 
@@ -141,10 +188,6 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.attemptIds.add(attemptId);
   }
 
-  public Task.PullHost getPullHost() {
-    return this.host;
-  }
-
   public ExecutionBlockId getExecutionBlockId() {
     return executionBlockId;
   }
@@ -153,20 +196,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.executionBlockId = executionBlockId;
   }
 
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public void setPartitionId(int partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  public String getRangeParams() {
-    return rangeParams;
-  }
-
-  public void setRangeParams(String rangeParams) {
-    this.rangeParams = rangeParams;
+  public void setRangeParams(RangeParam rangeParams) {
+    this.rangeParam = rangeParams;
   }
 
   public boolean hasNext() {
@@ -185,36 +216,10 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     this.type = type;
   }
 
-  /**
-   * Get the pull server URIs.
-   */
-  public List<URI> getURIs(){
-    return Repartitioner.createFetchURL(this, true);
-  }
-
-  /**
-   * Get the pull server URIs without repeated parameters.
-   */
-  public List<URI> getSimpleURIs(){
-    return Repartitioner.createFetchURL(this, false);
-  }
-
   public String getName() {
     return name;
   }
 
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public List<Integer> getTaskIds() {
-    return taskIds;
-  }
-
-  public List<Integer> getAttemptIds() {
-    return attemptIds;
-  }
-
   public long getOffset() {
     return offset;
   }
@@ -238,8 +243,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
     newFetchImpl.type = type;
     newFetchImpl.executionBlockId = executionBlockId;
     newFetchImpl.partitionId = partitionId;
-    newFetchImpl.name = name;
-    newFetchImpl.rangeParams = rangeParams;
+    newFetchImpl.rangeParam = rangeParam;
     newFetchImpl.hasNext = hasNext;
     if (taskIds != null) {
       newFetchImpl.taskIds = Lists.newArrayList(taskIds);
@@ -269,7 +273,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
         TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
         TUtil.checkEquals(host, fetch.host) &&
         TUtil.checkEquals(name, fetch.name) &&
-        TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+        TUtil.checkEquals(rangeParam, fetch.rangeParam) &&
         TUtil.checkEquals(taskIds, fetch.taskIds) &&
         TUtil.checkEquals(type, fetch.type) &&
         TUtil.checkEquals(offset, fetch.offset) &&

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index b5abffe..250b4cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
 
@@ -42,6 +44,8 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -67,6 +71,7 @@ public class Fetcher {
   private TajoProtos.FetcherState state;
 
   private Bootstrap bootstrap;
+  private List<Long> chunkLengths = new ArrayList<>();
 
   public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
     this.uri = uri;
@@ -97,9 +102,6 @@ public class Fetcher {
               conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
           .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
           .option(ChannelOption.TCP_NODELAY, true);
-
-      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
-      bootstrap.handler(initializer);
     }
   }
 
@@ -123,12 +125,20 @@ public class Fetcher {
     return messageReceiveCount;
   }
 
-  public FileChunk get() throws IOException {
+  public List<FileChunk> get() throws IOException {
+    List<FileChunk> fileChunks = new ArrayList<>();
     if (useLocalFile) {
       startTime = System.currentTimeMillis();
       finishTime = System.currentTimeMillis();
       state = TajoProtos.FetcherState.FETCH_FINISHED;
-      return fileChunk;
+      fileChunks.add(fileChunk);
+      fileLen = fileChunk.getFile().length();
+      return fileChunks;
+    }
+
+    if (state == FetcherState.FETCH_INIT) {
+      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+      bootstrap.handler(initializer);
     }
 
     this.startTime = System.currentTimeMillis();
@@ -136,7 +146,7 @@ public class Fetcher {
     ChannelFuture future = null;
     try {
       future = bootstrap.clone().connect(new InetSocketAddress(host, port))
-              .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
 
       // Wait until the connection attempt succeeds or fails.
       Channel channel = future.awaitUninterruptibly().channel();
@@ -154,7 +164,7 @@ public class Fetcher {
       request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
 
       if(LOG.isDebugEnabled()) {
-        LOG.info("Status: " + getState() + ", URI:" + uri);
+        LOG.debug("Status: " + getState() + ", URI:" + uri);
       }
       // Send the HTTP request.
       channel.writeAndFlush(request);
@@ -163,7 +173,18 @@ public class Fetcher {
       channel.closeFuture().syncUninterruptibly();
 
       fileChunk.setLength(fileChunk.getFile().length());
-      return fileChunk;
+
+      long start = 0;
+      for (Long eachChunkLength : chunkLengths) {
+        if (eachChunkLength == 0) continue;
+        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+        chunk.setEbId(fileChunk.getEbId());
+        chunk.setFromRemote(fileChunk.fromRemote());
+        fileChunks.add(chunk);
+        start += eachChunkLength;
+      }
+      return fileChunks;
+
     } finally {
       if(future != null && future.channel().isOpen()){
         // Close the channel to exit.
@@ -226,6 +247,13 @@ public class Fetcher {
                 }
               }
             }
+            if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
+              String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
+
+              for (String eachSplit : stringOffset.split(",")) {
+                chunkLengths.add(Long.parseLong(eachSplit));
+              }
+            }
           }
           if (LOG.isDebugEnabled()) {
             LOG.debug(sb.toString());
@@ -296,6 +324,7 @@ public class Fetcher {
       if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
         //channel is closed, but cannot complete fetcher
         finishTime = System.currentTimeMillis();
+        LOG.error("Channel closed by peer: " + ctx.channel());
         state = TajoProtos.FetcherState.FETCH_FAILED;
       }
       IOUtils.cleanup(LOG, fc, raf);