You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:10 UTC

[12/48] git commit: TAJO-789: Improve shuffle URI. (jinho)

TAJO-789: Improve shuffle URI. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8321d263
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8321d263
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8321d263

Branch: refs/heads/window_function
Commit: 8321d263e39987aa3bd075c8da163ee093cad420
Parents: 71f394d
Author: jinossy <ji...@gmail.com>
Authored: Mon Apr 28 20:22:40 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Apr 28 20:22:40 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/engine/query/QueryUnitRequest.java     |   6 +-
 .../tajo/engine/query/QueryUnitRequestImpl.java |  39 ++--
 .../tajo/master/DefaultTaskScheduler.java       |  10 +-
 .../apache/tajo/master/FetchScheduleEvent.java  |   8 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   8 +-
 .../apache/tajo/master/ScheduledFetches.java    |  11 +-
 .../tajo/master/querymaster/QueryUnit.java      | 103 +++++----
 .../master/querymaster/QueryUnitAttempt.java    |   4 +-
 .../tajo/master/querymaster/Repartitioner.java  | 224 +++++++++----------
 .../tajo/master/querymaster/SubQuery.java       |   4 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  | 201 +++++++++++++++++
 .../java/org/apache/tajo/worker/Fetcher.java    |   2 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  39 ++--
 .../src/main/proto/TajoWorkerProtocol.proto     |  20 +-
 .../main/resources/webapps/worker/queryunit.jsp |   9 +-
 .../apache/tajo/master/TestRepartitioner.java   |  21 +-
 17 files changed, 475 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 13f908a..2134b54 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-789: Improve shuffle URI. (jinho)
+
     TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
 
     TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
index 383a787..dc9a63d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -27,8 +27,8 @@ import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.FetchImpl;
 
-import java.net.URI;
 import java.util.List;
 
 public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
@@ -40,8 +40,8 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
 	public String getSerializedData();
 	public boolean isInterQuery();
 	public void setInterQuery();
-	public void addFetch(String name, URI uri);
-	public List<TajoWorkerProtocol.Fetch> getFetches();
+	public void addFetch(String name, FetchImpl fetch);
+	public List<FetchImpl> getFetches();
   public boolean shouldDie();
   public void setShouldDie();
   public QueryContext getQueryContext();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index d4006e0..f1af2ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -21,11 +21,11 @@ package org.apache.tajo.engine.query;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
+import org.apache.tajo.worker.FetchImpl;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,7 +40,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	private boolean clusteredOutput;
 	private String serializedData;     // logical node
 	private Boolean interQuery;
-	private List<Fetch> fetches;
+	private List<FetchImpl> fetches;
   private Boolean shouldDie;
   private QueryContext queryContext;
   private DataChannel dataChannel;
@@ -177,16 +177,13 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	  maybeInitBuilder();
 	  this.interQuery = true;
 	}
-	
-	public void addFetch(String name, URI uri) {
-	  maybeInitBuilder();
-	  initFetches();
-	  fetches.add(
-	  Fetch.newBuilder()
-	    .setName(name)
-	    .setUrls(uri.toString()).build());
-	  
-	}
+
+  public void addFetch(String name, FetchImpl fetch) {
+    maybeInitBuilder();
+    initFetches();
+    fetch.setName(name);
+    fetches.add(fetch);
+  }
 
   public QueryContext getQueryContext() {
     QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -236,7 +233,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     return this.enforcer;
   }
 
-  public List<Fetch> getFetches() {
+  public List<FetchImpl> getFetches() {
 	  initFetches();    
 
     return this.fetches;
@@ -247,9 +244,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
       return;
     }
     QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    this.fetches = new ArrayList<Fetch>();
-    for(Fetch fetch : p.getFetchesList()) {
-      fetches.add(fetch);
+    this.fetches = new ArrayList<FetchImpl>();
+    for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) {
+      fetches.add(new FetchImpl(fetch));
     }
 	}
 
@@ -300,9 +297,11 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		if (this.interQuery != null) {
 		  builder.setInterQuery(this.interQuery);
 		}
-		if (this.fetches != null) {
-		  builder.addAllFetches(this.fetches);
-		}
+    if (this.fetches != null) {
+      for (int i = 0; i < fetches.size(); i++) {
+        builder.addFetches(fetches.get(i).getProto());
+      }
+    }
     if (this.shouldDie != null) {
       builder.setShouldDie(this.shouldDie);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 9978670..5bfac8b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -43,8 +43,8 @@ import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
 
-import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -202,11 +202,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        Map<String, List<URI>> fetches = castEvent.getFetches();
+        Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
         QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
         QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
         scheduledObjectNum++;
-        for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+        for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
           task.addFetches(eachFetch.getKey(), eachFetch.getValue());
           task.addFragment(fragmentsForNonLeafTask[0], true);
           if (fragmentsForNonLeafTask[1] != null) {
@@ -874,9 +874,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             taskAssign.setInterQuery();
           }
           for (ScanNode scan : task.getScanNodes()) {
-            Collection<URI> fetches = task.getFetch(scan);
+            Collection<FetchImpl> fetches = task.getFetch(scan);
             if (fetches != null) {
-              for (URI fetch : fetches) {
+              for (FetchImpl fetch : fetches) {
                 taskAssign.addFetch(scan.getTableName(), fetch);
               }
             }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
index 561f980..21e376c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -20,21 +20,21 @@ package org.apache.tajo.master;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.worker.FetchImpl;
 
-import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 public class FetchScheduleEvent extends TaskSchedulerEvent {
-  private final Map<String, List<URI>> fetches;
+  private final Map<String, List<FetchImpl>> fetches;
 
   public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
-                            final Map<String, List<URI>> fetches) {
+                            final Map<String, List<FetchImpl>> fetches) {
     super(eventType, blockId);
     this.fetches = fetches;
   }
 
-  public Map<String, List<URI>> getFetches() {
+  public Map<String, List<FetchImpl>> getFetches() {
     return fetches;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index dd82f28..6552998 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -40,9 +40,9 @@ import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
@@ -484,11 +484,11 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
     }
 
     if (!context.isLeafQuery()) {
-      Map<String, List<URI>> fetch = scheduledFetches.getNextFetch();
+      Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch();
       scheduledFetches.popNextFetch();
 
-      for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) {
-        for (URI eachValue : fetchEntry.getValue()) {
+      for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) {
+        for (FetchImpl eachValue : fetchEntry.getValue()) {
           taskAssign.addFetch(fetchEntry.getKey(), eachValue);
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
index 9b7dc22..b05572b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
@@ -18,15 +18,16 @@
 
 package org.apache.tajo.master;
 
-import java.net.URI;
+import org.apache.tajo.worker.FetchImpl;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 public class ScheduledFetches {
-  private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>();
+  private List<Map<String, List<FetchImpl>>> fetches = new ArrayList<Map<String, List<FetchImpl>>>();
 
-  public void addFetch(Map<String, List<URI>> fetch) {
+  public void addFetch(Map<String, List<FetchImpl>> fetch) {
     this.fetches.add(fetch);
   }
 
@@ -34,11 +35,11 @@ public class ScheduledFetches {
     return fetches.size() > 0;
   }
 
-  public Map<String, List<URI>> getNextFetch() {
+  public Map<String, List<FetchImpl>> getNextFetch() {
     return hasNextFetch() ? fetches.get(0) : null;
   }
 
-  public Map<String, List<URI>> popNextFetch() {
+  public Map<String, List<FetchImpl>> popNextFetch() {
     return hasNextFetch() ? fetches.remove(0) : null;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 34686da..27625b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -40,6 +40,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.FetchImpl;
 
 import java.net.URI;
 import java.util.*;
@@ -63,7 +64,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	private List<ScanNode> scan;
 	
 	private Map<String, Set<FragmentProto>> fragMap;
-	private Map<String, Set<URI>> fetchMap;
+	private Map<String, Set<FetchImpl>> fetchMap;
 
   private int totalFragmentNum;
 
@@ -269,18 +270,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return succeededHost;
   }
 	
-	public void addFetches(String tableId, Collection<URI> urilist) {
-	  Set<URI> uris;
+	public void addFetches(String tableId, Collection<FetchImpl> fetches) {
+	  Set<FetchImpl> fetchSet;
     if (fetchMap.containsKey(tableId)) {
-      uris = fetchMap.get(tableId);
+      fetchSet = fetchMap.get(tableId);
     } else {
-      uris = Sets.newHashSet();
+      fetchSet = Sets.newHashSet();
     }
-    uris.addAll(urilist);
-    fetchMap.put(tableId, uris);
+    fetchSet.addAll(fetches);
+    fetchMap.put(tableId, fetchSet);
 	}
 	
-	public void setFetches(Map<String, Set<URI>> fetches) {
+	public void setFetches(Map<String, Set<FetchImpl>> fetches) {
 	  this.fetchMap.clear();
 	  this.fetchMap.putAll(fetches);
 	}
@@ -301,19 +302,19 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 		return taskId;
 	}
 	
-	public Collection<URI> getFetchHosts(String tableId) {
+	public Collection<FetchImpl> getFetchHosts(String tableId) {
 	  return fetchMap.get(tableId);
 	}
 	
-	public Collection<Set<URI>> getFetches() {
+	public Collection<Set<FetchImpl>> getFetches() {
 	  return fetchMap.values();
 	}
 
-  public Map<String, Set<URI>> getFetchMap() {
+  public Map<String, Set<FetchImpl>> getFetchMap() {
     return fetchMap;
   }
 	
-	public Collection<URI> getFetch(ScanNode scan) {
+	public Collection<FetchImpl> getFetch(ScanNode scan) {
 	  return this.fetchMap.get(scan.getTableName());
 	}
 	
@@ -323,21 +324,24 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	
 	@Override
 	public String toString() {
-		String str = new String(plan.getType() + " \n");
+    StringBuilder builder = new StringBuilder();
+    builder.append(plan.getType() + " \n");
 		for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
-		  str += e.getKey() + " : ";
+		  builder.append(e.getKey()).append(" : ");
       for (FragmentProto fragment : e.getValue()) {
-        str += fragment + ", ";
+        builder.append(fragment).append(", ");
       }
 		}
-		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
-      str += e.getKey() + " : ";
-      for (URI t : e.getValue()) {
-        str += t + " ";
+		for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+      builder.append(e.getKey()).append(" : ");
+      for (FetchImpl t : e.getValue()) {
+        for (URI uri : t.getURIs()){
+          builder.append(uri).append(" ");
+        }
       }
     }
 		
-		return str;
+		return builder.toString();
 	}
 	
 	public void setStats(TableStats stats) {
@@ -612,20 +616,52 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return this.intermediateData;
   }
 
+  public static class PullHost {
+    String host;
+    int port;
+    public PullHost(String pullServerAddr, int pullServerPort){
+      this.host = pullServerAddr;
+      this.port = pullServerPort;
+    }
+    public String getHost() {
+      return host;
+    }
+
+    public int getPort() {
+      return this.port;
+    }
+
+    public String getPullAddress() {
+      return host + ":" + port;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(host, port);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof PullHost) {
+        PullHost other = (PullHost) obj;
+        return host.equals(other.host) && port == other.port;
+      }
+
+      return false;
+    }
+  }
+
   public static class IntermediateEntry {
     int taskId;
     int attemptId;
     int partId;
-    String pullHost;
-    int port;
+    PullHost host;
 
-    public IntermediateEntry(int taskId, int attemptId, int partId,
-                             String pullServerAddr, int pullServerPort) {
+    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
       this.taskId = taskId;
       this.attemptId = attemptId;
       this.partId = partId;
-      this.pullHost = pullServerAddr;
-      this.port = pullServerPort;
+      this.host = host;
     }
 
     public int getTaskId() {
@@ -640,22 +676,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       return this.partId;
     }
 
-    public String getPullHost() {
-      return this.pullHost;
-    }
-
-    public int getPullPort() {
-      return port;
-    }
-
-    public String getPullAddress() {
-      return pullHost + ":" + port;
+    public PullHost getPullHost() {
+      return this.host;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(taskId, attemptId, partId, pullHost, port);
+      return Objects.hashCode(taskId, partId, attemptId, host);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index b69742c..c3aae67 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -33,6 +33,7 @@ import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -273,9 +274,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     if (report.getShuffleFileOutputsCount() > 0) {
       this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
 
+      PullHost host = new PullHost(getHost(), getPullServerPort());
       for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
         IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
-            getId().getId(), p.getPartId(), getHost(), getPullServerPort());
+            getId().getId(), p.getPartId(), host);
         partitions.add(entry);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 31d433d..3a2e79f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -39,6 +39,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -47,6 +48,7 @@ import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -55,7 +57,6 @@ import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
 
@@ -66,7 +67,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
 public class Repartitioner {
   private static final Log LOG = LogFactory.getLog(Repartitioner.class);
 
-  private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+  private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
   private final static String UNKNOWN_HOST = "unknown";
 
   public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
@@ -282,23 +283,16 @@ public class Repartitioner {
 
   private static void addJoinShuffle(SubQuery subQuery, int partitionId,
                                      Map<String, List<IntermediateEntry>> grouppedPartitions) {
-    Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
+    Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>();
     for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
-      Map<String, List<IntermediateEntry>> requests;
+      Collection<FetchImpl> requests;
       if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
-          requests = mergeHashShuffleRequest(grouppedPartitions.get(execBlock.getId().toString()));
+          requests = mergeShuffleRequest(execBlock.getId(), partitionId, HASH_SHUFFLE,
+              grouppedPartitions.get(execBlock.getId().toString()));
       } else {
         return;
       }
-      Set<URI> fetchURIs = TUtil.newHashSet();
-      for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
-        Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
-            execBlock.getId(),
-            partitionId, HASH_SHUFFLE,
-            requestPerNode.getValue());
-        fetchURIs.addAll(uris);
-      }
-      fetches.put(execBlock.getId().toString(), Lists.newArrayList(fetchURIs));
+      fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
     }
     SubQuery.scheduleFetches(subQuery, fetches);
   }
@@ -309,17 +303,23 @@ public class Repartitioner {
    *
    * @return key: pullserver's address, value: a list of requests
    */
-  private static Map<String, List<IntermediateEntry>> mergeHashShuffleRequest(List<IntermediateEntry> partitions) {
-    Map<String, List<IntermediateEntry>> mergedPartitions = new HashMap<String, List<IntermediateEntry>>();
+  private static Collection<FetchImpl> mergeShuffleRequest(ExecutionBlockId ebid, int partitionId,
+                                                          TajoWorkerProtocol.ShuffleType type,
+                                                          List<IntermediateEntry> partitions) {
+    Map<QueryUnit.PullHost, FetchImpl> mergedPartitions = new HashMap<QueryUnit.PullHost, FetchImpl>();
+
     for (IntermediateEntry partition : partitions) {
-      if (mergedPartitions.containsKey(partition.getPullAddress())) {
-        mergedPartitions.get(partition.getPullAddress()).add(partition);
+      QueryUnit.PullHost host = partition.getPullHost();
+      if (mergedPartitions.containsKey(host)) {
+        FetchImpl fetch = mergedPartitions.get(partition.getPullHost());
+        fetch.addPart(partition.getTaskId(), partition.getAttemptId());
       } else {
-        mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
+        FetchImpl fetch = new FetchImpl(host, type, ebid, partitionId);
+        fetch.addPart(partition.getTaskId(), partition.getAttemptId());
+        mergedPartitions.put(partition.getPullHost(), fetch);
       }
     }
-
-    return mergedPartitions;
+    return mergedPartitions.values();
   }
 
   public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -388,38 +388,39 @@ public class Repartitioner {
     FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
     SubQuery.scheduleFragment(subQuery, dummyFragment);
 
-    List<String> basicFetchURIs = new ArrayList<String>();
+    List<FetchImpl> fetches = new ArrayList<FetchImpl>();
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
     for (ExecutionBlock childBlock : childBlocks) {
       SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
       for (QueryUnit qu : childExecSM.getQueryUnits()) {
         for (IntermediateEntry p : qu.getIntermediateData()) {
-          String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(), childBlock.getId(), p.taskId, p.attemptId);
-          basicFetchURIs.add(uri);
+          FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+          fetch.addPart(p.getTaskId(), p.getAttemptId());
+          fetches.add(fetch);
         }
       }
     }
 
     boolean ascendingFirstKey = sortSpecs[0].isAscending();
-    SortedMap<TupleRange, Collection<URI>> map;
+    SortedMap<TupleRange, Collection<FetchImpl>> map;
     if (ascendingFirstKey) {
-      map = new TreeMap<TupleRange, Collection<URI>>();
+      map = new TreeMap<TupleRange, Collection<FetchImpl>>();
     } else {
-      map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator());
+      map = new TreeMap<TupleRange, Collection<FetchImpl>>(new TupleRange.DescendingTupleRangeComparator());
     }
 
-    Set<URI> uris;
+    Set<FetchImpl> fetchSet;
     try {
       RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
       for (int i = 0; i < ranges.length; i++) {
-        uris = new HashSet<URI>();
-        for (String uri: basicFetchURIs) {
+        fetchSet = new HashSet<FetchImpl>();
+        for (FetchImpl fetch: fetches) {
           String rangeParam =
               TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder);
-          URI finalUri = URI.create(uri + "&" + rangeParam);
-          uris.add(finalUri);
+          fetch.setRangeParams(rangeParam);
+          fetchSet.add(fetch);
         }
-        map.put(ranges[i], uris);
+        map.put(ranges[i], fetchSet);
       }
 
     } catch (UnsupportedEncodingException e) {
@@ -431,39 +432,24 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions,
+  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions,
                                                    String tableName, int num) {
     int i;
-    Map<String, List<URI>>[] fetchesArray = new Map[num];
+    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
     for (i = 0; i < num; i++) {
-      fetchesArray[i] = new HashMap<String, List<URI>>();
+      fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
     }
     i = 0;
-    for (Entry<?, Collection<URI>> entry : partitions.entrySet()) {
-      Collection<URI> value = entry.getValue();
+    for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
+      Collection<FetchImpl> value = entry.getValue();
       TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
       if (i == num) i = 0;
     }
-    for (Map<String, List<URI>> eachFetches : fetchesArray) {
+    for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
       SubQuery.scheduleFetches(subQuery, eachFetches);
     }
   }
 
-  public static String createBasicFetchUri(String hostName, int port,
-                                           ExecutionBlockId childSid,
-                                           int taskId, int attemptId) {
-    String scheme = "http://";
-    StringBuilder sb = new StringBuilder(scheme);
-    sb.append(hostName).append(":").append(port).append("/?")
-        .append("qid=").append(childSid.getQueryId().toString())
-        .append("&sid=").append(childSid.getId())
-        .append("&").append("ta=").append(taskId).append("_").append(attemptId)
-        .append("&").append("p=0")
-        .append("&").append("type=r");
-
-    return sb.toString();
-  }
-
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
                                                  SubQuery subQuery, DataChannel channel,
                                                  int maxNum) {
@@ -483,8 +469,8 @@ public class Repartitioner {
     fragments.add(frag);
     SubQuery.scheduleFragments(subQuery, fragments);
 
-    Map<String, List<IntermediateEntry>> hashedByHost;
-    Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>();
+    Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
+    Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>();
 
     for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
@@ -496,14 +482,15 @@ public class Repartitioner {
       Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
       for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
         hashedByHost = hashByHost(interm.getValue());
-        for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
-          Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
-              interm.getKey(), channel.getShuffleType(), e.getValue());
+        for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
 
-          if (finalFetchURI.containsKey(interm.getKey())) {
-            finalFetchURI.get(interm.getKey()).addAll(uris);
+          FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+              block.getId(), interm.getKey(), e.getValue());
+
+          if (finalFetches.containsKey(interm.getKey())) {
+            finalFetches.get(interm.getKey()).add(fetch);
           } else {
-            finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+            finalFetches.put(interm.getKey(), TUtil.newList(fetch));
           }
         }
       }
@@ -511,8 +498,8 @@ public class Repartitioner {
 
     GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
     // get a proper number of tasks
-    int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
-    LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
+    int determinedTaskNum = Math.min(maxNum, finalFetches.size());
+    LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
     if (groupby != null && groupby.getGroupingColumns().length == 0) {
       determinedTaskNum = 1;
       LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
@@ -521,61 +508,70 @@ public class Repartitioner {
     // set the proper number of tasks to the estimated task num
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
     // divide fetch uris into the the proper number of tasks in a round robin manner.
-    scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum);
+    scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
     LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
   }
 
-  public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
-                                       int partitionId, ShuffleType type, List<IntermediateEntry> entries) {
+
+  public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
     String scheme = "http://";
+
     StringBuilder urlPrefix = new StringBuilder(scheme);
-    urlPrefix.append(hostAndPort).append("/?")
-        .append("qid=").append(ebid.getQueryId().toString())
-        .append("&sid=").append(ebid.getId())
-        .append("&p=").append(partitionId)
+    urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
+        .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
+        .append("&sid=").append(fetch.getExecutionBlockId().getId())
+        .append("&p=").append(fetch.getPartitionId())
         .append("&type=");
-    if (type == HASH_SHUFFLE) {
+    if (fetch.getType() == HASH_SHUFFLE) {
       urlPrefix.append("h");
-    } else if (type == RANGE_SHUFFLE) {
-      urlPrefix.append("r");
+    } else if (fetch.getType() == RANGE_SHUFFLE) {
+      urlPrefix.append("r").append("&").append(fetch.getRangeParams());
     }
-    urlPrefix.append("&ta=");
-
-    // If the get request is longer than 2000 characters,
-    // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
-    // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
-    // The below code transforms a long request to multiple requests.
-    List<String> taskIdsParams = new ArrayList<String>();
-    boolean first = true;
-    StringBuilder taskIdListBuilder = new StringBuilder();
-    for (IntermediateEntry entry: entries) {
-      StringBuilder taskAttemptId = new StringBuilder();
-
-      if (!first) { // when comma is added?
-        taskAttemptId.append(",");
-      } else {
-        first = false;
-      }
 
-      taskAttemptId.append(entry.getTaskId()).append("_").
-          append(entry.getAttemptId());
-      if (taskIdListBuilder.length() + taskAttemptId.length()
-          > HTTP_REQUEST_MAXIMUM_LENGTH) {
+    List<URI> fetchURLs = new ArrayList<URI>();
+    if(includeParts){
+      // If the get request is longer than 2000 characters,
+      // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+      // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+      // The below code transforms a long request to multiple requests.
+      List<String> taskIdsParams = new ArrayList<String>();
+      StringBuilder taskIdListBuilder = new StringBuilder();
+      List<Integer> taskIds = fetch.getTaskIds();
+      List<Integer> attemptIds = fetch.getAttemptIds();
+      boolean first = true;
+
+      for (int i = 0; i < taskIds.size(); i++) {
+        StringBuilder taskAttemptId = new StringBuilder();
+
+        if (!first) { // when comma is added?
+          taskAttemptId.append(",");
+        } else {
+          first = false;
+        }
+
+        int taskId = taskIds.get(i);
+        int attemptId = attemptIds.get(i);
+        taskAttemptId.append(taskId).append("_").append(attemptId);
+
+        if (taskIdListBuilder.length() + taskAttemptId.length()
+            > HTTP_REQUEST_MAXIMUM_LENGTH) {
+          taskIdsParams.add(taskIdListBuilder.toString());
+          taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
+        } else {
+          taskIdListBuilder.append(taskAttemptId);
+        }
+      }
+      // if the url params remain
+      if (taskIdListBuilder.length() > 0) {
         taskIdsParams.add(taskIdListBuilder.toString());
-        taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
-      } else {
-        taskIdListBuilder.append(taskAttemptId);
       }
-    }
-
-    // if the url params remain
-    if (taskIdListBuilder.length() > 0) {
-      taskIdsParams.add(taskIdListBuilder.toString());
-    }
 
-    Collection<URI> fetchURLs = new ArrayList<URI>();
-    for (String param : taskIdsParams) {
-      fetchURLs.add(URI.create(urlPrefix + param));
+      urlPrefix.append("&ta=");
+      for (String param : taskIdsParams) {
+        fetchURLs.add(URI.create(urlPrefix + param));
+      }
+    } else {
+      fetchURLs.add(URI.create(urlPrefix.toString()));
     }
 
     return fetchURLs;
@@ -594,16 +590,16 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static Map<String, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
-    Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
+  public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
+    Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>();
 
-    String hostName;
+    QueryUnit.PullHost host;
     for (IntermediateEntry entry : entries) {
-      hostName = entry.getPullHost() + ":" + entry.getPullPort();
-      if (hashed.containsKey(hostName)) {
-        hashed.get(hostName).add(entry);
+      host = entry.getPullHost();
+      if (hashed.containsKey(host)) {
+        hashed.get(host).add(entry);
       } else {
-        hashed.put(hostName, TUtil.newList(entry));
+        hashed.put(host, TUtil.newList(entry));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 31c0efa..921bb3a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -56,9 +56,9 @@ import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
@@ -945,7 +945,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.getId(), leftFragment, rightFragments));
   }
 
-  public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
+  public static void scheduleFetches(SubQuery subQuery, Map<String, List<FetchImpl>> fetches) {
     subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
         subQuery.getId(), fetches));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
new file mode 100644
index 0000000..9d1f428
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Repartitioner;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * <code>FetchImpl</code> information to indicate the locations of intermediate data.
+ */
+public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> {
+  private TajoWorkerProtocol.FetchProto.Builder builder = null;
+
+  private QueryUnit.PullHost host;             // The pull server host information
+  private TajoWorkerProtocol.ShuffleType type; // hash or range partition method.
+  private ExecutionBlockId executionBlockId;   // The executionBlock id
+  private int partitionId;                     // The hash partition id
+  private String name;                         // The intermediate source name
+  private String rangeParams;                  // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+  private boolean hasNext = false;             // optional, if true, has more taskIds
+
+  private List<Integer> taskIds;               // repeated, the task ids
+  private List<Integer> attemptIds;            // repeated, the attempt ids
+
+  public FetchImpl() {
+    builder = TajoWorkerProtocol.FetchProto.newBuilder();
+    taskIds = new ArrayList<Integer>();
+    attemptIds = new ArrayList<Integer>();
+  }
+
+  public FetchImpl(TajoWorkerProtocol.FetchProto proto) {
+    this(new QueryUnit.PullHost(proto.getHost(), proto.getPort()),
+        proto.getType(),
+        new ExecutionBlockId(proto.getExecutionBlockId()),
+        proto.getPartitionId(),
+        proto.getRangeParams(),
+        proto.getHasNext(),
+        proto.getName(),
+        proto.getTaskIdList(), proto.getAttemptIdList());
+  }
+
+  public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId) {
+    this(host, type, executionBlockId, partitionId, null, false, null,
+        new ArrayList<Integer>(), new ArrayList<Integer>());
+  }
+
+  public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, List<QueryUnit.IntermediateEntry> intermediateEntryList) {
+    this(host, type, executionBlockId, partitionId, null, false, null,
+        new ArrayList<Integer>(), new ArrayList<Integer>());
+    for (QueryUnit.IntermediateEntry entry : intermediateEntryList){
+      addPart(entry.getTaskId(), entry.getAttemptId());
+    }
+  }
+
+  public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+                   int partitionId, String rangeParams, boolean hasNext, String name,
+                   List<Integer> taskIds, List<Integer> attemptIds) {
+    this.host = host;
+    this.type = type;
+    this.executionBlockId = executionBlockId;
+    this.partitionId = partitionId;
+    this.rangeParams = rangeParams;
+    this.hasNext = hasNext;
+    this.name = name;
+    this.taskIds = taskIds;
+    this.attemptIds = attemptIds;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, hasNext, taskIds, attemptIds);
+  }
+
+  @Override
+  public TajoWorkerProtocol.FetchProto getProto() {
+    if (builder == null) {
+      builder = TajoWorkerProtocol.FetchProto.newBuilder();
+    }
+    builder.setHost(host.getHost());
+    builder.setPort(host.getPort());
+    builder.setType(type);
+    builder.setExecutionBlockId(executionBlockId.getProto());
+    builder.setPartitionId(partitionId);
+    builder.setHasNext(hasNext);
+    builder.setName(name);
+    if (rangeParams != null && !rangeParams.isEmpty()) {
+      builder.setRangeParams(rangeParams);
+    }
+
+    Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+    builder.addAllTaskId(taskIds);
+    builder.addAllAttemptId(attemptIds);
+    return builder.build();
+  }
+
+  public void addPart(int taskId, int attemptId) {
+    this.taskIds.add(taskId);
+    this.attemptIds.add(attemptId);
+  }
+
+  public QueryUnit.PullHost getPullHost() {
+    return this.host;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public void setExecutionBlockId(ExecutionBlockId executionBlockId) {
+    this.executionBlockId = executionBlockId;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public void setPartitionId(int partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  public String getRangeParams() {
+    return rangeParams;
+  }
+
+  public void setRangeParams(String rangeParams) {
+    this.rangeParams = rangeParams;
+  }
+
+  public boolean hasNext() {
+    return hasNext;
+  }
+
+  public void setHasNext(boolean hasNext) {
+    this.hasNext = hasNext;
+  }
+
+  public TajoWorkerProtocol.ShuffleType getType() {
+    return type;
+  }
+
+  public void setType(TajoWorkerProtocol.ShuffleType type) {
+    this.type = type;
+  }
+
+  /**
+   * Get the pull server URIs.
+   */
+  public List<URI> getURIs(){
+    return Repartitioner.createFetchURL(this, true);
+  }
+
+  /**
+   * Get the pull server URIs without repeated parameters.
+   */
+  public List<URI> getSimpleURIs(){
+    return Repartitioner.createFetchURL(this, false);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public List<Integer> getTaskIds() {
+    return taskIds;
+  }
+
+  public List<Integer> getAttemptIds() {
+    return attemptIds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index bb136f7..a4836e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -181,7 +181,7 @@ public class Fetcher {
                   sb.append(name).append(" = ").append(value);
                 }
                 if (this.length == -1 && name.equals("Content-Length")) {
-                  this.length = Long.valueOf(value);
+                  this.length = Long.parseLong(value);
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index ef52fd0..5c252fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
@@ -54,7 +53,6 @@ import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.ApplicationIdUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.io.File;
@@ -192,8 +190,8 @@ public class Task {
 
     LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
     LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
-    for (Fetch f : request.getFetches()) {
-      LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
+    for (FetchImpl f : request.getFetches()) {
+      LOG.info("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
     }
     LOG.info("* Local task dir: " + taskDir);
     if(LOG.isDebugEnabled()) {
@@ -622,7 +620,7 @@ public class Task {
   }
 
   private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<Fetch> fetches) throws IOException {
+                                        List<FetchImpl> fetches) throws IOException {
 
     if (fetches.size() > 0) {
 
@@ -639,15 +637,17 @@ public class Task {
       int i = 0;
       File storeFile;
       List<Fetcher> runnerList = Lists.newArrayList();
-      for (Fetch f : fetches) {
-        storeDir = new File(inputDir.toString(), f.getName());
-        if (!storeDir.exists()) {
-          storeDir.mkdirs();
+      for (FetchImpl f : fetches) {
+        for (URI uri : f.getURIs()) {
+          storeDir = new File(inputDir.toString(), f.getName());
+          if (!storeDir.exists()) {
+            storeDir.mkdirs();
+          }
+          storeFile = new File(storeDir, "in_" + i);
+          Fetcher fetcher = new Fetcher(uri, storeFile, channelFactory);
+          runnerList.add(fetcher);
+          i++;
         }
-        storeFile = new File(storeDir, "in_" + i);
-        Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
-        runnerList.add(fetcher);
-        i++;
       }
       ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
       return runnerList;
@@ -737,19 +737,6 @@ public class Task {
       }
     }
   }
-
-  public static final String FILECACHE = "filecache";
-  public static final String APPCACHE = "appcache";
-  public static final String USERCACHE = "usercache";
-
-  String fileCache;
-  public String getFileCacheDir() {
-    fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
-        ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) +
-        "/" + "output";
-    return fileCache;
-  }
-
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
         StorageUtil.concatPath(

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 78da10f..5d4ae44 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -68,18 +68,30 @@ message QueryUnitRequestProto {
     required bool clusteredOutput = 4;
     required string serializedData = 5;
     optional bool interQuery = 6 [default = false];
-    repeated Fetch fetches = 7;
+    repeated FetchProto fetches = 7;
     optional bool shouldDie = 8;
     optional KeyValueSetProto queryContext = 9;
     optional DataChannelProto dataChannel = 10;
     optional EnforcerProto enforcer = 11;
 }
 
-message Fetch {
-    required string name = 1;
-    required string urls = 2;
+message FetchProto {
+    required string host = 1;
+    required int32 port = 2;
+    required ShuffleType type = 3;
+    required ExecutionBlockIdProto executionBlockId = 4;
+    required int32 partitionId = 5;
+    required string name = 6;
+    optional string rangeParams = 7;
+    optional bool hasNext = 8 [default = false];
+
+    //repeated part
+    repeated int32 taskId = 9 [packed=true];
+    repeated int32 attemptId = 10 [packed=true];
 }
 
+
+
 message QueryUnitResponseProto {
     required string id = 1;
     required QueryState status = 2;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 3e8dfef..06dca00 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -40,6 +40,7 @@
 <%@ page import="java.util.Set" %>
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
 <%@ page import="org.apache.tajo.worker.TaskHistory" %>
+<%@ page import="org.apache.tajo.worker.FetchImpl" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -108,11 +109,13 @@
 
     String fetchInfo = "";
     delim = "";
-    for (Map.Entry<String, Set<URI>> e : queryUnit.getFetchMap().entrySet()) {
+    for (Map.Entry<String, Set<FetchImpl>> e : queryUnit.getFetchMap().entrySet()) {
         fetchInfo += delim + "<b>" + e.getKey() + "</b>";
         delim = "<br/>";
-        for (URI t : e.getValue()) {
-            fetchInfo += delim + t;
+        for (FetchImpl f : e.getValue()) {
+            for (URI uri : f.getSimpleURIs()){
+                fetchInfo += delim + uri;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 987dc2a..0ccaebe 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -23,13 +23,16 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.Repartitioner;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.FetchImpl;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
 
 import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
 
@@ -44,12 +47,18 @@ public class TestRepartitioner {
 
     List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList();
     for (int i = 0; i < 1000; i++) {
-      intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, hostName, port));
+      intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port)));
     }
 
-    Collection<URI> uris = Repartitioner.
-        createHashFetchURL(hostName + ":" + port, sid, partitionId,
-                TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, intermediateEntries);
+    FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+        sid, partitionId, intermediateEntries);
+
+    fetch.setName(sid.toString());
+
+    TajoWorkerProtocol.FetchProto proto = fetch.getProto();
+    fetch = new FetchImpl(proto);
+    assertEquals(proto, fetch.getProto());
+    List<URI> uris = fetch.getURIs();
 
     List<String> taList = TUtil.newList();
     for (URI uri : uris) {