You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:10 UTC
[12/48] git commit: TAJO-789: Improve shuffle URI. (jinho)
TAJO-789: Improve shuffle URI. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8321d263
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8321d263
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8321d263
Branch: refs/heads/window_function
Commit: 8321d263e39987aa3bd075c8da163ee093cad420
Parents: 71f394d
Author: jinossy <ji...@gmail.com>
Authored: Mon Apr 28 20:22:40 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Apr 28 20:22:40 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/engine/query/QueryUnitRequest.java | 6 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 39 ++--
.../tajo/master/DefaultTaskScheduler.java | 10 +-
.../apache/tajo/master/FetchScheduleEvent.java | 8 +-
.../apache/tajo/master/LazyTaskScheduler.java | 8 +-
.../apache/tajo/master/ScheduledFetches.java | 11 +-
.../tajo/master/querymaster/QueryUnit.java | 103 +++++----
.../master/querymaster/QueryUnitAttempt.java | 4 +-
.../tajo/master/querymaster/Repartitioner.java | 224 +++++++++----------
.../tajo/master/querymaster/SubQuery.java | 4 +-
.../java/org/apache/tajo/worker/FetchImpl.java | 201 +++++++++++++++++
.../java/org/apache/tajo/worker/Fetcher.java | 2 +-
.../main/java/org/apache/tajo/worker/Task.java | 39 ++--
.../src/main/proto/TajoWorkerProtocol.proto | 20 +-
.../main/resources/webapps/worker/queryunit.jsp | 9 +-
.../apache/tajo/master/TestRepartitioner.java | 21 +-
17 files changed, 475 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 13f908a..2134b54 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-789: Improve shuffle URI. (jinho)
+
TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
index 383a787..dc9a63d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -27,8 +27,8 @@ import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.FetchImpl;
-import java.net.URI;
import java.util.List;
public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
@@ -40,8 +40,8 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
public String getSerializedData();
public boolean isInterQuery();
public void setInterQuery();
- public void addFetch(String name, URI uri);
- public List<TajoWorkerProtocol.Fetch> getFetches();
+ public void addFetch(String name, FetchImpl fetch);
+ public List<FetchImpl> getFetches();
public boolean shouldDie();
public void setShouldDie();
public QueryContext getQueryContext();
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index d4006e0..f1af2ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -21,11 +21,11 @@ package org.apache.tajo.engine.query;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
+import org.apache.tajo.worker.FetchImpl;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -40,7 +40,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
private boolean clusteredOutput;
private String serializedData; // logical node
private Boolean interQuery;
- private List<Fetch> fetches;
+ private List<FetchImpl> fetches;
private Boolean shouldDie;
private QueryContext queryContext;
private DataChannel dataChannel;
@@ -177,16 +177,13 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
maybeInitBuilder();
this.interQuery = true;
}
-
- public void addFetch(String name, URI uri) {
- maybeInitBuilder();
- initFetches();
- fetches.add(
- Fetch.newBuilder()
- .setName(name)
- .setUrls(uri.toString()).build());
-
- }
+
+ public void addFetch(String name, FetchImpl fetch) {
+ maybeInitBuilder();
+ initFetches();
+ fetch.setName(name);
+ fetches.add(fetch);
+ }
public QueryContext getQueryContext() {
QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -236,7 +233,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
return this.enforcer;
}
- public List<Fetch> getFetches() {
+ public List<FetchImpl> getFetches() {
initFetches();
return this.fetches;
@@ -247,9 +244,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
return;
}
QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
- this.fetches = new ArrayList<Fetch>();
- for(Fetch fetch : p.getFetchesList()) {
- fetches.add(fetch);
+ this.fetches = new ArrayList<FetchImpl>();
+ for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) {
+ fetches.add(new FetchImpl(fetch));
}
}
@@ -300,9 +297,11 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
if (this.interQuery != null) {
builder.setInterQuery(this.interQuery);
}
- if (this.fetches != null) {
- builder.addAllFetches(this.fetches);
- }
+ if (this.fetches != null) {
+ for (int i = 0; i < fetches.size(); i++) {
+ builder.addFetches(fetches.get(i).getProto());
+ }
+ }
if (this.shouldDie != null) {
builder.setShouldDie(this.shouldDie);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 9978670..5bfac8b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -43,8 +43,8 @@ import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
-import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
@@ -202,11 +202,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
} else if (event instanceof FetchScheduleEvent) {
FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
- Map<String, List<URI>> fetches = castEvent.getFetches();
+ Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
scheduledObjectNum++;
- for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+ for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
task.addFetches(eachFetch.getKey(), eachFetch.getValue());
task.addFragment(fragmentsForNonLeafTask[0], true);
if (fragmentsForNonLeafTask[1] != null) {
@@ -874,9 +874,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
taskAssign.setInterQuery();
}
for (ScanNode scan : task.getScanNodes()) {
- Collection<URI> fetches = task.getFetch(scan);
+ Collection<FetchImpl> fetches = task.getFetch(scan);
if (fetches != null) {
- for (URI fetch : fetches) {
+ for (FetchImpl fetch : fetches) {
taskAssign.addFetch(scan.getTableName(), fetch);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
index 561f980..21e376c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -20,21 +20,21 @@ package org.apache.tajo.master;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.worker.FetchImpl;
-import java.net.URI;
import java.util.List;
import java.util.Map;
public class FetchScheduleEvent extends TaskSchedulerEvent {
- private final Map<String, List<URI>> fetches;
+ private final Map<String, List<FetchImpl>> fetches;
public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
- final Map<String, List<URI>> fetches) {
+ final Map<String, List<FetchImpl>> fetches) {
super(eventType, blockId);
this.fetches = fetches;
}
- public Map<String, List<URI>> getFetches() {
+ public Map<String, List<FetchImpl>> getFetches() {
return fetches;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index dd82f28..6552998 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -40,9 +40,9 @@ import org.apache.tajo.master.querymaster.QueryUnitAttempt;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
-import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
@@ -484,11 +484,11 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
}
if (!context.isLeafQuery()) {
- Map<String, List<URI>> fetch = scheduledFetches.getNextFetch();
+ Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch();
scheduledFetches.popNextFetch();
- for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) {
- for (URI eachValue : fetchEntry.getValue()) {
+ for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) {
+ for (FetchImpl eachValue : fetchEntry.getValue()) {
taskAssign.addFetch(fetchEntry.getKey(), eachValue);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
index 9b7dc22..b05572b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
@@ -18,15 +18,16 @@
package org.apache.tajo.master;
-import java.net.URI;
+import org.apache.tajo.worker.FetchImpl;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ScheduledFetches {
- private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>();
+ private List<Map<String, List<FetchImpl>>> fetches = new ArrayList<Map<String, List<FetchImpl>>>();
- public void addFetch(Map<String, List<URI>> fetch) {
+ public void addFetch(Map<String, List<FetchImpl>> fetch) {
this.fetches.add(fetch);
}
@@ -34,11 +35,11 @@ public class ScheduledFetches {
return fetches.size() > 0;
}
- public Map<String, List<URI>> getNextFetch() {
+ public Map<String, List<FetchImpl>> getNextFetch() {
return hasNextFetch() ? fetches.get(0) : null;
}
- public Map<String, List<URI>> popNextFetch() {
+ public Map<String, List<FetchImpl>> popNextFetch() {
return hasNextFetch() ? fetches.remove(0) : null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 34686da..27625b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -40,6 +40,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.FetchImpl;
import java.net.URI;
import java.util.*;
@@ -63,7 +64,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private List<ScanNode> scan;
private Map<String, Set<FragmentProto>> fragMap;
- private Map<String, Set<URI>> fetchMap;
+ private Map<String, Set<FetchImpl>> fetchMap;
private int totalFragmentNum;
@@ -269,18 +270,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return succeededHost;
}
- public void addFetches(String tableId, Collection<URI> urilist) {
- Set<URI> uris;
+ public void addFetches(String tableId, Collection<FetchImpl> fetches) {
+ Set<FetchImpl> fetchSet;
if (fetchMap.containsKey(tableId)) {
- uris = fetchMap.get(tableId);
+ fetchSet = fetchMap.get(tableId);
} else {
- uris = Sets.newHashSet();
+ fetchSet = Sets.newHashSet();
}
- uris.addAll(urilist);
- fetchMap.put(tableId, uris);
+ fetchSet.addAll(fetches);
+ fetchMap.put(tableId, fetchSet);
}
- public void setFetches(Map<String, Set<URI>> fetches) {
+ public void setFetches(Map<String, Set<FetchImpl>> fetches) {
this.fetchMap.clear();
this.fetchMap.putAll(fetches);
}
@@ -301,19 +302,19 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return taskId;
}
- public Collection<URI> getFetchHosts(String tableId) {
+ public Collection<FetchImpl> getFetchHosts(String tableId) {
return fetchMap.get(tableId);
}
- public Collection<Set<URI>> getFetches() {
+ public Collection<Set<FetchImpl>> getFetches() {
return fetchMap.values();
}
- public Map<String, Set<URI>> getFetchMap() {
+ public Map<String, Set<FetchImpl>> getFetchMap() {
return fetchMap;
}
- public Collection<URI> getFetch(ScanNode scan) {
+ public Collection<FetchImpl> getFetch(ScanNode scan) {
return this.fetchMap.get(scan.getTableName());
}
@@ -323,21 +324,24 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public String toString() {
- String str = new String(plan.getType() + " \n");
+ StringBuilder builder = new StringBuilder();
+ builder.append(plan.getType() + " \n");
for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
- str += e.getKey() + " : ";
+ builder.append(e.getKey()).append(" : ");
for (FragmentProto fragment : e.getValue()) {
- str += fragment + ", ";
+ builder.append(fragment).append(", ");
}
}
- for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
- str += e.getKey() + " : ";
- for (URI t : e.getValue()) {
- str += t + " ";
+ for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+ builder.append(e.getKey()).append(" : ");
+ for (FetchImpl t : e.getValue()) {
+ for (URI uri : t.getURIs()){
+ builder.append(uri).append(" ");
+ }
}
}
- return str;
+ return builder.toString();
}
public void setStats(TableStats stats) {
@@ -612,20 +616,52 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.intermediateData;
}
+ public static class PullHost {
+ String host;
+ int port;
+ public PullHost(String pullServerAddr, int pullServerPort){
+ this.host = pullServerAddr;
+ this.port = pullServerPort;
+ }
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public String getPullAddress() {
+ return host + ":" + port;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, port);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PullHost) {
+ PullHost other = (PullHost) obj;
+ return host.equals(other.host) && port == other.port;
+ }
+
+ return false;
+ }
+ }
+
public static class IntermediateEntry {
int taskId;
int attemptId;
int partId;
- String pullHost;
- int port;
+ PullHost host;
- public IntermediateEntry(int taskId, int attemptId, int partId,
- String pullServerAddr, int pullServerPort) {
+ public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
this.taskId = taskId;
this.attemptId = attemptId;
this.partId = partId;
- this.pullHost = pullServerAddr;
- this.port = pullServerPort;
+ this.host = host;
}
public int getTaskId() {
@@ -640,22 +676,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.partId;
}
- public String getPullHost() {
- return this.pullHost;
- }
-
- public int getPullPort() {
- return port;
- }
-
- public String getPullAddress() {
- return pullHost + ":" + port;
+ public PullHost getPullHost() {
+ return this.host;
}
@Override
public int hashCode() {
- return Objects.hashCode(taskId, attemptId, partId, pullHost, port);
+ return Objects.hashCode(taskId, partId, attemptId, host);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index b69742c..c3aae67 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -33,6 +33,7 @@ import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -273,9 +274,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
if (report.getShuffleFileOutputsCount() > 0) {
this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
+ PullHost host = new PullHost(getHost(), getPullServerPort());
for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
- getId().getId(), p.getPartId(), getHost(), getPullServerPort());
+ getId().getId(), p.getPartId(), host);
partitions.add(entry);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 31d433d..3a2e79f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -39,6 +39,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TaskSchedulerContext;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -47,6 +48,7 @@ import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -55,7 +57,6 @@ import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
@@ -66,7 +67,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
public class Repartitioner {
private static final Log LOG = LogFactory.getLog(Repartitioner.class);
- private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+ private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
private final static String UNKNOWN_HOST = "unknown";
public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
@@ -282,23 +283,16 @@ public class Repartitioner {
private static void addJoinShuffle(SubQuery subQuery, int partitionId,
Map<String, List<IntermediateEntry>> grouppedPartitions) {
- Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
+ Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>();
for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
- Map<String, List<IntermediateEntry>> requests;
+ Collection<FetchImpl> requests;
if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
- requests = mergeHashShuffleRequest(grouppedPartitions.get(execBlock.getId().toString()));
+ requests = mergeShuffleRequest(execBlock.getId(), partitionId, HASH_SHUFFLE,
+ grouppedPartitions.get(execBlock.getId().toString()));
} else {
return;
}
- Set<URI> fetchURIs = TUtil.newHashSet();
- for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
- Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
- execBlock.getId(),
- partitionId, HASH_SHUFFLE,
- requestPerNode.getValue());
- fetchURIs.addAll(uris);
- }
- fetches.put(execBlock.getId().toString(), Lists.newArrayList(fetchURIs));
+ fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
}
SubQuery.scheduleFetches(subQuery, fetches);
}
@@ -309,17 +303,23 @@ public class Repartitioner {
*
* @return key: pullserver's address, value: a list of requests
*/
- private static Map<String, List<IntermediateEntry>> mergeHashShuffleRequest(List<IntermediateEntry> partitions) {
- Map<String, List<IntermediateEntry>> mergedPartitions = new HashMap<String, List<IntermediateEntry>>();
+ private static Collection<FetchImpl> mergeShuffleRequest(ExecutionBlockId ebid, int partitionId,
+ TajoWorkerProtocol.ShuffleType type,
+ List<IntermediateEntry> partitions) {
+ Map<QueryUnit.PullHost, FetchImpl> mergedPartitions = new HashMap<QueryUnit.PullHost, FetchImpl>();
+
for (IntermediateEntry partition : partitions) {
- if (mergedPartitions.containsKey(partition.getPullAddress())) {
- mergedPartitions.get(partition.getPullAddress()).add(partition);
+ QueryUnit.PullHost host = partition.getPullHost();
+ if (mergedPartitions.containsKey(host)) {
+ FetchImpl fetch = mergedPartitions.get(partition.getPullHost());
+ fetch.addPart(partition.getTaskId(), partition.getAttemptId());
} else {
- mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
+ FetchImpl fetch = new FetchImpl(host, type, ebid, partitionId);
+ fetch.addPart(partition.getTaskId(), partition.getAttemptId());
+ mergedPartitions.put(partition.getPullHost(), fetch);
}
}
-
- return mergedPartitions;
+ return mergedPartitions.values();
}
public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -388,38 +388,39 @@ public class Repartitioner {
FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
SubQuery.scheduleFragment(subQuery, dummyFragment);
- List<String> basicFetchURIs = new ArrayList<String>();
+ List<FetchImpl> fetches = new ArrayList<FetchImpl>();
List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
for (ExecutionBlock childBlock : childBlocks) {
SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
for (QueryUnit qu : childExecSM.getQueryUnits()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
- String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(), childBlock.getId(), p.taskId, p.attemptId);
- basicFetchURIs.add(uri);
+ FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+ fetch.addPart(p.getTaskId(), p.getAttemptId());
+ fetches.add(fetch);
}
}
}
boolean ascendingFirstKey = sortSpecs[0].isAscending();
- SortedMap<TupleRange, Collection<URI>> map;
+ SortedMap<TupleRange, Collection<FetchImpl>> map;
if (ascendingFirstKey) {
- map = new TreeMap<TupleRange, Collection<URI>>();
+ map = new TreeMap<TupleRange, Collection<FetchImpl>>();
} else {
- map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator());
+ map = new TreeMap<TupleRange, Collection<FetchImpl>>(new TupleRange.DescendingTupleRangeComparator());
}
- Set<URI> uris;
+ Set<FetchImpl> fetchSet;
try {
RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
for (int i = 0; i < ranges.length; i++) {
- uris = new HashSet<URI>();
- for (String uri: basicFetchURIs) {
+ fetchSet = new HashSet<FetchImpl>();
+ for (FetchImpl fetch: fetches) {
String rangeParam =
TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder);
- URI finalUri = URI.create(uri + "&" + rangeParam);
- uris.add(finalUri);
+ fetch.setRangeParams(rangeParam);
+ fetchSet.add(fetch);
}
- map.put(ranges[i], uris);
+ map.put(ranges[i], fetchSet);
}
} catch (UnsupportedEncodingException e) {
@@ -431,39 +432,24 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
}
- public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions,
+ public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions,
String tableName, int num) {
int i;
- Map<String, List<URI>>[] fetchesArray = new Map[num];
+ Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
for (i = 0; i < num; i++) {
- fetchesArray[i] = new HashMap<String, List<URI>>();
+ fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
}
i = 0;
- for (Entry<?, Collection<URI>> entry : partitions.entrySet()) {
- Collection<URI> value = entry.getValue();
+ for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
+ Collection<FetchImpl> value = entry.getValue();
TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
if (i == num) i = 0;
}
- for (Map<String, List<URI>> eachFetches : fetchesArray) {
+ for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
SubQuery.scheduleFetches(subQuery, eachFetches);
}
}
- public static String createBasicFetchUri(String hostName, int port,
- ExecutionBlockId childSid,
- int taskId, int attemptId) {
- String scheme = "http://";
- StringBuilder sb = new StringBuilder(scheme);
- sb.append(hostName).append(":").append(port).append("/?")
- .append("qid=").append(childSid.getQueryId().toString())
- .append("&sid=").append(childSid.getId())
- .append("&").append("ta=").append(taskId).append("_").append(attemptId)
- .append("&").append("p=0")
- .append("&").append("type=r");
-
- return sb.toString();
- }
-
public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
SubQuery subQuery, DataChannel channel,
int maxNum) {
@@ -483,8 +469,8 @@ public class Repartitioner {
fragments.add(frag);
SubQuery.scheduleFragments(subQuery, fragments);
- Map<String, List<IntermediateEntry>> hashedByHost;
- Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>();
+ Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
+ Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>();
for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
@@ -496,14 +482,15 @@ public class Repartitioner {
Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
hashedByHost = hashByHost(interm.getValue());
- for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
- Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
- interm.getKey(), channel.getShuffleType(), e.getValue());
+ for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
- if (finalFetchURI.containsKey(interm.getKey())) {
- finalFetchURI.get(interm.getKey()).addAll(uris);
+ FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+ block.getId(), interm.getKey(), e.getValue());
+
+ if (finalFetches.containsKey(interm.getKey())) {
+ finalFetches.get(interm.getKey()).add(fetch);
} else {
- finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+ finalFetches.put(interm.getKey(), TUtil.newList(fetch));
}
}
}
@@ -511,8 +498,8 @@ public class Repartitioner {
GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
// get a proper number of tasks
- int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
- LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
+ int determinedTaskNum = Math.min(maxNum, finalFetches.size());
+ LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
if (groupby != null && groupby.getGroupingColumns().length == 0) {
determinedTaskNum = 1;
LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
@@ -521,61 +508,70 @@ public class Repartitioner {
// set the proper number of tasks to the estimated task num
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
// divide fetch uris into the the proper number of tasks in a round robin manner.
- scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum);
+ scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
}
- public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
- int partitionId, ShuffleType type, List<IntermediateEntry> entries) {
+
+ public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
String scheme = "http://";
+
StringBuilder urlPrefix = new StringBuilder(scheme);
- urlPrefix.append(hostAndPort).append("/?")
- .append("qid=").append(ebid.getQueryId().toString())
- .append("&sid=").append(ebid.getId())
- .append("&p=").append(partitionId)
+ urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
+ .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
+ .append("&sid=").append(fetch.getExecutionBlockId().getId())
+ .append("&p=").append(fetch.getPartitionId())
.append("&type=");
- if (type == HASH_SHUFFLE) {
+ if (fetch.getType() == HASH_SHUFFLE) {
urlPrefix.append("h");
- } else if (type == RANGE_SHUFFLE) {
- urlPrefix.append("r");
+ } else if (fetch.getType() == RANGE_SHUFFLE) {
+ urlPrefix.append("r").append("&").append(fetch.getRangeParams());
}
- urlPrefix.append("&ta=");
-
- // If the get request is longer than 2000 characters,
- // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
- // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
- // The below code transforms a long request to multiple requests.
- List<String> taskIdsParams = new ArrayList<String>();
- boolean first = true;
- StringBuilder taskIdListBuilder = new StringBuilder();
- for (IntermediateEntry entry: entries) {
- StringBuilder taskAttemptId = new StringBuilder();
-
- if (!first) { // when comma is added?
- taskAttemptId.append(",");
- } else {
- first = false;
- }
- taskAttemptId.append(entry.getTaskId()).append("_").
- append(entry.getAttemptId());
- if (taskIdListBuilder.length() + taskAttemptId.length()
- > HTTP_REQUEST_MAXIMUM_LENGTH) {
+ List<URI> fetchURLs = new ArrayList<URI>();
+ if(includeParts){
+ // If the get request is longer than 2000 characters,
+ // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+ // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+ // The below code transforms a long request to multiple requests.
+ List<String> taskIdsParams = new ArrayList<String>();
+ StringBuilder taskIdListBuilder = new StringBuilder();
+ List<Integer> taskIds = fetch.getTaskIds();
+ List<Integer> attemptIds = fetch.getAttemptIds();
+ boolean first = true;
+
+ for (int i = 0; i < taskIds.size(); i++) {
+ StringBuilder taskAttemptId = new StringBuilder();
+
+ if (!first) { // when comma is added?
+ taskAttemptId.append(",");
+ } else {
+ first = false;
+ }
+
+ int taskId = taskIds.get(i);
+ int attemptId = attemptIds.get(i);
+ taskAttemptId.append(taskId).append("_").append(attemptId);
+
+ if (taskIdListBuilder.length() + taskAttemptId.length()
+ > HTTP_REQUEST_MAXIMUM_LENGTH) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
+ } else {
+ taskIdListBuilder.append(taskAttemptId);
+ }
+ }
+ // if the url params remain
+ if (taskIdListBuilder.length() > 0) {
taskIdsParams.add(taskIdListBuilder.toString());
- taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
- } else {
- taskIdListBuilder.append(taskAttemptId);
}
- }
-
- // if the url params remain
- if (taskIdListBuilder.length() > 0) {
- taskIdsParams.add(taskIdListBuilder.toString());
- }
- Collection<URI> fetchURLs = new ArrayList<URI>();
- for (String param : taskIdsParams) {
- fetchURLs.add(URI.create(urlPrefix + param));
+ urlPrefix.append("&ta=");
+ for (String param : taskIdsParams) {
+ fetchURLs.add(URI.create(urlPrefix + param));
+ }
+ } else {
+ fetchURLs.add(URI.create(urlPrefix.toString()));
}
return fetchURLs;
@@ -594,16 +590,16 @@ public class Repartitioner {
return hashed;
}
- public static Map<String, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
- Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
+ public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
+ Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>();
- String hostName;
+ QueryUnit.PullHost host;
for (IntermediateEntry entry : entries) {
- hostName = entry.getPullHost() + ":" + entry.getPullPort();
- if (hashed.containsKey(hostName)) {
- hashed.get(hostName).add(entry);
+ host = entry.getPullHost();
+ if (hashed.containsKey(host)) {
+ hashed.get(host).add(entry);
} else {
- hashed.put(hostName, TUtil.newList(entry));
+ hashed.put(host, TUtil.newList(entry));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 31c0efa..921bb3a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -56,9 +56,9 @@ import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
-import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
@@ -945,7 +945,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.getId(), leftFragment, rightFragments));
}
- public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
+ public static void scheduleFetches(SubQuery subQuery, Map<String, List<FetchImpl>> fetches) {
subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
subQuery.getId(), fetches));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
new file mode 100644
index 0000000..9d1f428
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Repartitioner;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * <code>FetchImpl</code> information to indicate the locations of intermediate data.
+ */
+public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> {
+ private TajoWorkerProtocol.FetchProto.Builder builder = null;
+
+ private QueryUnit.PullHost host; // The pull server host information
+ private TajoWorkerProtocol.ShuffleType type; // hash or range partition method.
+ private ExecutionBlockId executionBlockId; // The executionBlock id
+ private int partitionId; // The hash partition id
+ private String name; // The intermediate source name
+ private String rangeParams; // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+ private boolean hasNext = false; // optional, if true, has more taskIds
+
+ private List<Integer> taskIds; // repeated, the task ids
+ private List<Integer> attemptIds; // repeated, the attempt ids
+
+ public FetchImpl() {
+ builder = TajoWorkerProtocol.FetchProto.newBuilder();
+ taskIds = new ArrayList<Integer>();
+ attemptIds = new ArrayList<Integer>();
+ }
+
+ public FetchImpl(TajoWorkerProtocol.FetchProto proto) {
+ this(new QueryUnit.PullHost(proto.getHost(), proto.getPort()),
+ proto.getType(),
+ new ExecutionBlockId(proto.getExecutionBlockId()),
+ proto.getPartitionId(),
+ proto.getRangeParams(),
+ proto.getHasNext(),
+ proto.getName(),
+ proto.getTaskIdList(), proto.getAttemptIdList());
+ }
+
+ public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId) {
+ this(host, type, executionBlockId, partitionId, null, false, null,
+ new ArrayList<Integer>(), new ArrayList<Integer>());
+ }
+
+ public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, List<QueryUnit.IntermediateEntry> intermediateEntryList) {
+ this(host, type, executionBlockId, partitionId, null, false, null,
+ new ArrayList<Integer>(), new ArrayList<Integer>());
+ for (QueryUnit.IntermediateEntry entry : intermediateEntryList){
+ addPart(entry.getTaskId(), entry.getAttemptId());
+ }
+ }
+
+ public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, String rangeParams, boolean hasNext, String name,
+ List<Integer> taskIds, List<Integer> attemptIds) {
+ this.host = host;
+ this.type = type;
+ this.executionBlockId = executionBlockId;
+ this.partitionId = partitionId;
+ this.rangeParams = rangeParams;
+ this.hasNext = hasNext;
+ this.name = name;
+ this.taskIds = taskIds;
+ this.attemptIds = attemptIds;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, hasNext, taskIds, attemptIds);
+ }
+
+ @Override
+ public TajoWorkerProtocol.FetchProto getProto() {
+ if (builder == null) {
+ builder = TajoWorkerProtocol.FetchProto.newBuilder();
+ }
+ builder.setHost(host.getHost());
+ builder.setPort(host.getPort());
+ builder.setType(type);
+ builder.setExecutionBlockId(executionBlockId.getProto());
+ builder.setPartitionId(partitionId);
+ builder.setHasNext(hasNext);
+ builder.setName(name);
+ if (rangeParams != null && !rangeParams.isEmpty()) {
+ builder.setRangeParams(rangeParams);
+ }
+
+ Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+ builder.addAllTaskId(taskIds);
+ builder.addAllAttemptId(attemptIds);
+ return builder.build();
+ }
+
+ public void addPart(int taskId, int attemptId) {
+ this.taskIds.add(taskId);
+ this.attemptIds.add(attemptId);
+ }
+
+ public QueryUnit.PullHost getPullHost() {
+ return this.host;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public void setExecutionBlockId(ExecutionBlockId executionBlockId) {
+ this.executionBlockId = executionBlockId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public String getRangeParams() {
+ return rangeParams;
+ }
+
+ public void setRangeParams(String rangeParams) {
+ this.rangeParams = rangeParams;
+ }
+
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ public void setHasNext(boolean hasNext) {
+ this.hasNext = hasNext;
+ }
+
+ public TajoWorkerProtocol.ShuffleType getType() {
+ return type;
+ }
+
+ public void setType(TajoWorkerProtocol.ShuffleType type) {
+ this.type = type;
+ }
+
+ /**
+ * Get the pull server URIs.
+ */
+ public List<URI> getURIs(){
+ return Repartitioner.createFetchURL(this, true);
+ }
+
+ /**
+ * Get the pull server URIs without repeated parameters.
+ */
+ public List<URI> getSimpleURIs(){
+ return Repartitioner.createFetchURL(this, false);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<Integer> getTaskIds() {
+ return taskIds;
+ }
+
+ public List<Integer> getAttemptIds() {
+ return attemptIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index bb136f7..a4836e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -181,7 +181,7 @@ public class Fetcher {
sb.append(name).append(" = ").append(value);
}
if (this.length == -1 && name.equals("Content-Length")) {
- this.length = Long.valueOf(value);
+ this.length = Long.parseLong(value);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index ef52fd0..5c252fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.TaskAttemptState;
@@ -54,7 +53,6 @@ import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.ApplicationIdUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.io.File;
@@ -192,8 +190,8 @@ public class Task {
LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
- for (Fetch f : request.getFetches()) {
- LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
+ for (FetchImpl f : request.getFetches()) {
+ LOG.info("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
}
LOG.info("* Local task dir: " + taskDir);
if(LOG.isDebugEnabled()) {
@@ -622,7 +620,7 @@ public class Task {
}
private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
- List<Fetch> fetches) throws IOException {
+ List<FetchImpl> fetches) throws IOException {
if (fetches.size() > 0) {
@@ -639,15 +637,17 @@ public class Task {
int i = 0;
File storeFile;
List<Fetcher> runnerList = Lists.newArrayList();
- for (Fetch f : fetches) {
- storeDir = new File(inputDir.toString(), f.getName());
- if (!storeDir.exists()) {
- storeDir.mkdirs();
+ for (FetchImpl f : fetches) {
+ for (URI uri : f.getURIs()) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ storeDir.mkdirs();
+ }
+ storeFile = new File(storeDir, "in_" + i);
+ Fetcher fetcher = new Fetcher(uri, storeFile, channelFactory);
+ runnerList.add(fetcher);
+ i++;
}
- storeFile = new File(storeDir, "in_" + i);
- Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
- runnerList.add(fetcher);
- i++;
}
ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
return runnerList;
@@ -737,19 +737,6 @@ public class Task {
}
}
}
-
- public static final String FILECACHE = "filecache";
- public static final String APPCACHE = "appcache";
- public static final String USERCACHE = "usercache";
-
- String fileCache;
- public String getFileCacheDir() {
- fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
- ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) +
- "/" + "output";
- return fileCache;
- }
-
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
StorageUtil.concatPath(
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 78da10f..5d4ae44 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -68,18 +68,30 @@ message QueryUnitRequestProto {
required bool clusteredOutput = 4;
required string serializedData = 5;
optional bool interQuery = 6 [default = false];
- repeated Fetch fetches = 7;
+ repeated FetchProto fetches = 7;
optional bool shouldDie = 8;
optional KeyValueSetProto queryContext = 9;
optional DataChannelProto dataChannel = 10;
optional EnforcerProto enforcer = 11;
}
-message Fetch {
- required string name = 1;
- required string urls = 2;
+message FetchProto {
+ required string host = 1;
+ required int32 port = 2;
+ required ShuffleType type = 3;
+ required ExecutionBlockIdProto executionBlockId = 4;
+ required int32 partitionId = 5;
+ required string name = 6;
+ optional string rangeParams = 7;
+ optional bool hasNext = 8 [default = false];
+
+ //repeated part
+ repeated int32 taskId = 9 [packed=true];
+ repeated int32 attemptId = 10 [packed=true];
}
+
+
message QueryUnitResponseProto {
required string id = 1;
required QueryState status = 2;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 3e8dfef..06dca00 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -40,6 +40,7 @@
<%@ page import="java.util.Set" %>
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
<%@ page import="org.apache.tajo.worker.TaskHistory" %>
+<%@ page import="org.apache.tajo.worker.FetchImpl" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -108,11 +109,13 @@
String fetchInfo = "";
delim = "";
- for (Map.Entry<String, Set<URI>> e : queryUnit.getFetchMap().entrySet()) {
+ for (Map.Entry<String, Set<FetchImpl>> e : queryUnit.getFetchMap().entrySet()) {
fetchInfo += delim + "<b>" + e.getKey() + "</b>";
delim = "<br/>";
- for (URI t : e.getValue()) {
- fetchInfo += delim + t;
+ for (FetchImpl f : e.getValue()) {
+ for (URI uri : f.getSimpleURIs()){
+ fetchInfo += delim + uri;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 987dc2a..0ccaebe 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -23,13 +23,16 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TestTajoIds;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.Repartitioner;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.FetchImpl;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.Test;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import static junit.framework.Assert.assertEquals;
@@ -44,12 +47,18 @@ public class TestRepartitioner {
List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList();
for (int i = 0; i < 1000; i++) {
- intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, hostName, port));
+ intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port)));
}
- Collection<URI> uris = Repartitioner.
- createHashFetchURL(hostName + ":" + port, sid, partitionId,
- TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, intermediateEntries);
+ FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+ sid, partitionId, intermediateEntries);
+
+ fetch.setName(sid.toString());
+
+ TajoWorkerProtocol.FetchProto proto = fetch.getProto();
+ fetch = new FetchImpl(proto);
+ assertEquals(proto, fetch.getProto());
+ List<URI> uris = fetch.getURIs();
List<String> taList = TUtil.newList();
for (URI uri : uris) {