You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/02 09:26:08 UTC
[1/2] TAJO-464: Rename the name 'partition',
actually meaning shuffle to 'shuffle'. (hyunsik)
Updated Branches:
refs/heads/master df5727c49 -> bb7e6b6b3
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 86d8cf2..c39c06e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -56,7 +56,8 @@ public class TaskAttemptContext {
private boolean needFetch = false;
private CountDownLatch doneFetchPhaseSignal;
private float progress = 0;
- private Map<Integer, String> repartitions;
+ /** a map of shuffled file outputs */
+ private Map<Integer, String> shuffleFileOutputs;
private File fetchIn;
private boolean stopped = false;
private boolean interQuery = false;
@@ -81,7 +82,7 @@ public class TaskAttemptContext {
}
this.workDir = workDir;
- this.repartitions = Maps.newHashMap();
+ this.shuffleFileOutputs = Maps.newHashMap();
state = TaskAttemptState.TA_PENDING;
}
@@ -175,12 +176,12 @@ public class TaskAttemptContext {
return doneFetchPhaseSignal;
}
- public void addRepartition(int partKey, String path) {
- repartitions.put(partKey, path);
+ public void addShuffleFileOutput(int partId, String fileName) {
+ shuffleFileOutputs.put(partId, fileName);
}
- public Iterator<Entry<Integer,String>> getRepartitions() {
- return repartitions.entrySet().iterator();
+ public Iterator<Entry<Integer,String>> getShuffleFileOutputs() {
+ return shuffleFileOutputs.entrySet().iterator();
}
public void updateAssignedFragments(String tableId, Fragment[] fragments) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index c5e7ad0..9aa6d86 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -35,14 +35,14 @@ message TaskStatusProto {
required TaskAttemptState state = 4;
optional StatSetProto stats = 5;
optional TableStatsProto resultStats = 6;
- repeated Partition partitions = 7;
+ repeated ShuffleFileOutput shuffleFileOutputs = 7;
}
message TaskCompletionReport {
required QueryUnitAttemptIdProto id = 1;
optional StatSetProto stats = 2;
optional TableStatsProto resultStats = 3;
- repeated Partition partitions = 4;
+ repeated ShuffleFileOutput shuffleFileOutputs = 4;
}
message TaskFatalErrorReport {
@@ -100,8 +100,8 @@ enum CommandType {
FINALIZE = 3;
}
-message Partition {
- required int32 partitionKey = 1;
+message ShuffleFileOutput {
+ required int32 partId = 1;
optional string fileName = 2;
}
@@ -117,11 +117,10 @@ message GetTaskRequestProto {
required ExecutionBlockIdProto executionBlockId = 2;
}
-enum PartitionType {
- NONE_PARTITION = 0;
- LIST_PARTITION = 1;
- HASH_PARTITION = 2;
- RANGE_PARTITION = 3;
+enum ShuffleType {
+ NONE_SHUFFLE = 0;
+ HASH_SHUFFLE = 1;
+ RANGE_SHUFFLE = 2;
}
enum TransmitType {
@@ -135,12 +134,12 @@ message DataChannelProto {
required ExecutionBlockIdProto targetId = 2;
required TransmitType transmitType = 3 [default = PULL_TRANSMIT];
- required PartitionType partitionType = 4;
+ required ShuffleType shuffleType = 4;
optional SchemaProto schema = 5;
- repeated ColumnProto partitionKey = 7;
- optional int32 partitionNum = 9 [default = 1];
+ repeated ColumnProto shuffleKeys = 7;
+ optional int32 numOutputs = 9 [default = 1];
optional StoreType storeType = 10 [default = CSV];
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryplan.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryplan.jsp
index aeb2c74..ec860b9 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryplan.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryplan.jsp
@@ -21,13 +21,11 @@
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="org.apache.hadoop.conf.Configuration" %>
<%@ page import="org.apache.tajo.master.querymaster.Query" %>
<%@ page import="org.apache.tajo.QueryId" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
-<%@ page import="java.text.DecimalFormat" %>
<%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %>
<%@ page import="java.util.*" %>
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
@@ -134,7 +132,7 @@
String outgoing = "";
String prefix = "";
for (DataChannel channel : masterPlan.getOutgoingChannels(eachSubQueryInfo.executionBlock.getId())) {
- outgoing += prefix + channel.getPartitionType();
+ outgoing += prefix + channel.getShuffleType();
prefix = "; ";
}
%>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
index 7dcb0b3..61fa202 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
@@ -125,9 +125,9 @@
String partitionKey = "-";
String partitionFileName = "-";
if(numPartitions > 0) {
- TajoWorkerProtocol.Partition partition = queryUnit.getPartitions().get(0);
- partitionKey = "" + partition.getPartitionKey();
- partitionFileName = partition.getFileName();
+ TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = queryUnit.getShuffleFileOutputs().get(0);
+ partitionKey = "" + shuffleFileOutputs.getPartId();
+ partitionFileName = shuffleFileOutputs.getFileName();
}
//int numIntermediateData = queryUnit.getIntermediateData() == null ? 0 : queryUnit.getIntermediateData().size();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
index 960ee57..ab56bea 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
@@ -35,11 +35,11 @@ public class TestMasterPlan {
ExecutionBlock eb2 = masterPlan.newExecutionBlock();
ExecutionBlock eb3 = masterPlan.newExecutionBlock();
- masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.PartitionType.LIST_PARTITION);
+ masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
- masterPlan.addConnect(eb3, eb2, TajoWorkerProtocol.PartitionType.LIST_PARTITION);
+ masterPlan.addConnect(eb3, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 1a46af6..509e72e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -43,6 +43,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -62,7 +63,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
import static org.junit.Assert.*;
@@ -450,8 +451,8 @@ public class TestPhysicalPlanner {
Column key1 = new Column("score.deptName", Type.TEXT);
Column key2 = new Column("score.class", Type.TEXT);
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.HASH_PARTITION, numPartitions);
- dataChannel.setPartitionKey(new Column[]{key1, key2});
+ ShuffleType.HASH_SHUFFLE, numPartitions);
+ dataChannel.setShuffleKeys(new Column[]{key1, key2});
ctx.setDataChannel(dataChannel);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -508,8 +509,8 @@ public class TestPhysicalPlanner {
LogicalNode rootNode = plan.getRootBlock().getRoot();
int numPartitions = 1;
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.HASH_PARTITION, numPartitions);
- dataChannel.setPartitionKey(new Column[]{});
+ ShuffleType.HASH_SHUFFLE, numPartitions);
+ dataChannel.setShuffleKeys(new Column[]{});
ctx.setDataChannel(dataChannel);
optimizer.optimize(plan);
@@ -773,8 +774,8 @@ public class TestPhysicalPlanner {
SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.RANGE_PARTITION);
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
+ TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
+ channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
ctx.setDataChannel(channel);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 532bb81..987dc2a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -49,7 +49,7 @@ public class TestRepartitioner {
Collection<URI> uris = Repartitioner.
createHashFetchURL(hostName + ":" + port, sid, partitionId,
- TajoWorkerProtocol.PartitionType.HASH_PARTITION, intermediateEntries);
+ TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, intermediateEntries);
List<String> taList = TUtil.newList();
for (URI uri : uris) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 2d97e7a..9c21633 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -35,7 +35,7 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.IndexedStoreExec;
+import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec;
import org.apache.tajo.engine.planner.physical.MemSortExec;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.planner.physical.ProjectionExec;
@@ -147,7 +147,7 @@ public class TestRangeRetrieverHandler {
MemSortExec sort = (MemSortExec) proj.getChild();
SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
- IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort, sort.getSchema(),
+ RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sm, sort, sort.getSchema(),
sort.getSchema(), sortSpecs);
exec = idxStoreExec;
@@ -260,7 +260,7 @@ public class TestRangeRetrieverHandler {
ProjectionExec proj = (ProjectionExec) exec;
MemSortExec sort = (MemSortExec) proj.getChild();
SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
- IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort,
+ RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sm, sort,
sort.getSchema(), sort.getSchema(), sortSpecs);
exec = idxStoreExec;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 0f96e2c..837cb71 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -364,17 +364,17 @@ public class TajoPullServerService extends AbstractService {
final List<String> qids = params.get("qid");
final List<String> taskIdList = params.get("ta");
final List<String> subQueryIds = params.get("sid");
- final List<String> partitionIds = params.get("p");
+ final List<String> partIds = params.get("p");
if (types == null || taskIdList == null || subQueryIds == null || qids == null
- || partitionIds == null) {
- sendError(ctx, "Required queryId, type, taskIds, subquery Id, and partition id",
+ || partIds == null) {
+ sendError(ctx, "Required queryId, type, taskIds, subquery Id, and part id",
BAD_REQUEST);
return;
}
if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
- sendError(ctx, "Required qids, type, taskIds, subquery Id, and partition id",
+ sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id",
BAD_REQUEST);
return;
}
@@ -382,21 +382,21 @@ public class TajoPullServerService extends AbstractService {
final List<FileChunk> chunks = Lists.newArrayList();
String queryId = qids.get(0);
- String repartitionType = types.get(0);
+ String shuffleType = types.get(0);
String sid = subQueryIds.get(0);
- String partitionId = partitionIds.get(0);
+ String partId = partIds.get(0);
List<String> taskIds = splitMaps(taskIdList);
- LOG.info("PullServer request param: repartitionType=" + repartitionType +
- ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
+ LOG.info("PullServer request param: shuffleType=" + shuffleType +
+ ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
// the working dir of tajo worker for each query
String queryBaseDir = queryId.toString() + "/output";
LOG.info("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
- // if a subquery requires a range partitioning
- if (repartitionType.equals("r")) {
+ // if a subquery requires a range shuffle
+ if (shuffleType.equals("r")) {
String ta = taskIds.get(0);
Path path = localFS.makeQualified(
lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
@@ -417,18 +417,18 @@ public class TajoPullServerService extends AbstractService {
chunks.add(chunk);
}
- // if a subquery requires a hash repartition
- } else if (repartitionType.equals("h")) {
+ // if a subquery requires a hash shuffle
+ } else if (shuffleType.equals("h")) {
for (String ta : taskIds) {
Path path = localFS.makeQualified(
lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
- ta + "/output/" + partitionId, conf));
+ ta + "/output/" + partId, conf));
File file = new File(path.toUri());
FileChunk chunk = new FileChunk(file, 0, file.length());
chunks.add(chunk);
}
} else {
- LOG.error("Unknown repartition type: " + repartitionType);
+ LOG.error("Unknown shuffle type: " + shuffleType);
return;
}
@@ -482,16 +482,16 @@ public class TajoPullServerService extends AbstractService {
}
ChannelFuture writeFuture;
if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+ final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
file.startOffset, file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(partition);
+ writeFuture = ch.write(filePart);
writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures,
// attribute to appropriate spill output
@Override
public void operationComplete(ChannelFuture future) {
- partition.releaseExternalResources();
+ filePart.releaseExternalResources();
}
});
} else {
[2/2] git commit: TAJO-464: Rename the name 'partition',
actually meaning shuffle to 'shuffle'. (hyunsik)
Posted by hy...@apache.org.
TAJO-464: Rename the name 'partition', actually meaning shuffle to 'shuffle'. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bb7e6b6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bb7e6b6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bb7e6b6b
Branch: refs/heads/master
Commit: bb7e6b6b3ca3f729e292df3b5905a46fe773f392
Parents: df5727c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Jan 2 17:10:05 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Jan 2 17:10:05 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../engine/planner/PhysicalPlannerImpl.java | 105 ++++++-------
.../tajo/engine/planner/global/DataChannel.java | 96 ++++++------
.../engine/planner/global/GlobalPlanner.java | 74 ++++++----
.../tajo/engine/planner/global/MasterPlan.java | 10 +-
.../planner/logical/PersistentStoreNode.java | 110 ++++++++++++++
.../planner/logical/ShuffleFileWriteNode.java | 114 ++++++++++++++
.../engine/planner/logical/StoreTableNode.java | 78 +---------
.../planner/physical/HashPartitioner.java | 6 +-
.../physical/HashShuffleFileWriteExec.java | 146 ++++++++++++++++++
.../planner/physical/IndexedStoreExec.java | 122 ---------------
.../planner/physical/PartitionedStoreExec.java | 148 -------------------
.../engine/planner/physical/Partitioner.java | 4 +-
.../physical/RangeShuffleFileWriteExec.java | 128 ++++++++++++++++
.../engine/planner/physical/StoreTableExec.java | 34 ++---
.../tajo/master/querymaster/QueryUnit.java | 10 +-
.../master/querymaster/QueryUnitAttempt.java | 11 +-
.../tajo/master/querymaster/Repartitioner.java | 38 ++---
.../tajo/master/querymaster/SubQuery.java | 4 +-
.../main/java/org/apache/tajo/worker/Task.java | 16 +-
.../apache/tajo/worker/TaskAttemptContext.java | 13 +-
.../src/main/proto/TajoWorkerProtocol.proto | 23 ++-
.../main/resources/webapps/worker/queryplan.jsp | 4 +-
.../main/resources/webapps/worker/queryunit.jsp | 6 +-
.../engine/planner/global/TestMasterPlan.java | 4 +-
.../planner/physical/TestPhysicalPlanner.java | 15 +-
.../apache/tajo/master/TestRepartitioner.java | 2 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 34 ++---
29 files changed, 764 insertions(+), 600 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cfc53e..f3e74ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -110,6 +110,9 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-464: Rename the name 'partition', actually meaning shuffle to
+ 'shuffle'. (hyunsik)
+
TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)
TAJO-468: Implements task's detail info page in WEB UI.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5120106..d6d518c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -72,8 +72,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
try {
execPlan = createPlanRecursive(context, logicalPlan);
if (execPlan instanceof StoreTableExec
- || execPlan instanceof IndexedStoreExec
- || execPlan instanceof PartitionedStoreExec
+ || execPlan instanceof RangeShuffleFileWriteExec
+ || execPlan instanceof HashShuffleFileWriteExec
|| execPlan instanceof ColumnPartitionedTableStoreExec) {
return execPlan;
} else if (context.getDataChannel() != null) {
@@ -89,18 +89,15 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
- StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
- if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
- storeTableNode.setInSchema(plan.getOutSchema());
- storeTableNode.setOutSchema(plan.getOutSchema());
- if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
- storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
- } else {
- storeTableNode.setDefaultParition();
- }
- storeTableNode.setChild(plan);
-
- PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
+ ShuffleFileWriteNode shuffleFileWriteNode =
+ new ShuffleFileWriteNode(UNGENERATED_PID, channel.getTargetId().toString());
+ shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
+ shuffleFileWriteNode.setInSchema(plan.getOutSchema());
+ shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
+ shuffleFileWriteNode.setShuffle(channel.getShuffleType(), channel.getShuffleKeys(), channel.getShuffleOutputNum());
+ shuffleFileWriteNode.setChild(plan);
+
+ PhysicalExec outExecPlan = createShuffleFileWritePlan(context, shuffleFileWriteNode, execPlan);
return outExecPlan;
}
@@ -606,50 +603,56 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- public PhysicalExec createStorePlan(TaskAttemptContext ctx,
- StoreTableNode plan, PhysicalExec subOp) throws IOException {
- if (plan.getPartitionType() == PartitionType.HASH_PARTITION
- || plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
- switch (ctx.getDataChannel().getPartitionType()) {
- case HASH_PARTITION:
- return new PartitionedStoreExec(ctx, sm, plan, subOp);
-
- case RANGE_PARTITION:
- SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
-
- SortSpec [] sortSpecs = null;
- if (sortExec != null) {
- sortSpecs = sortExec.getSortSpecs();
- } else {
- Column[] columns = ctx.getDataChannel().getPartitionKey();
- SortSpec specs[] = new SortSpec[columns.length];
- for (int i = 0; i < columns.length; i++) {
- specs[i] = new SortSpec(columns[i]);
- }
- }
-
- return new IndexedStoreExec(ctx, sm, subOp,
- plan.getInSchema(), plan.getInSchema(), sortSpecs);
+
+ /**
+ * Create a shuffle file write executor to store intermediate data into local disks.
+ */
+ public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx,
+ ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException {
+ switch (plan.getShuffleType()) {
+ case HASH_SHUFFLE:
+ return new HashShuffleFileWriteExec(ctx, sm, plan, subOp);
+
+ case RANGE_SHUFFLE:
+ SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
+
+ SortSpec [] sortSpecs = null;
+ if (sortExec != null) {
+ sortSpecs = sortExec.getSortSpecs();
+ } else {
+ Column[] columns = ctx.getDataChannel().getShuffleKeys();
+ SortSpec specs[] = new SortSpec[columns.length];
+ for (int i = 0; i < columns.length; i++) {
+ specs[i] = new SortSpec(columns[i]);
+ }
}
+ return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs);
+
+ case NONE_SHUFFLE:
+ return new StoreTableExec(ctx, plan, subOp);
+
+ default:
+ throw new IllegalStateException(ctx.getDataChannel().getShuffleType() + " is not supported yet.");
}
- if (plan instanceof StoreIndexNode) {
- return new TunnelExec(ctx, plan.getOutSchema(), subOp);
- }
+ }
+
+ /**
+ * Create a executor to store a table into HDFS. This is used for CREATE TABLE ..
+ * AS or INSERT (OVERWRITE) INTO statement.
+ */
+ public PhysicalExec createStorePlan(TaskAttemptContext ctx,
+ StoreTableNode plan, PhysicalExec subOp) throws IOException {
- // Find partitioned table
if (plan.getPartitions() != null) {
- if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+ switch (plan.getPartitions().getPartitionsType()) {
+ case COLUMN:
return new ColumnPartitionedTableStoreExec(ctx, plan, subOp);
- } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.HASH)) {
- // TODO
- } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
- // TODO
- } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
- // TODO
+ default:
+ throw new IllegalStateException(plan.getPartitions().getPartitionsType() + " is not supported yet.");
}
+ } else {
+ return new StoreTableExec(ctx, plan, subOp);
}
-
- return new StoreTableExec(ctx, plan, subOp);
}
public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 556c7ff..efa1e05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -30,9 +30,9 @@ public class DataChannel {
private ExecutionBlockId srcId;
private ExecutionBlockId targetId;
private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
- private PartitionType partitionType;
- private Integer partitionNum = 1;
- private Column[] key;
+ private ShuffleType shuffleType;
+ private Integer numOutputs = 1;
+ private Column[] shuffleKeys;
private Schema schema;
@@ -43,39 +43,39 @@ public class DataChannel {
this.targetId = targetId;
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType) {
this(srcId, targetId);
- this.partitionType = partitionType;
+ this.shuffleType = shuffleType;
}
- public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
- this(src.getId(), target.getId(), partitionType, partNum);
+ public DataChannel(ExecutionBlock src, ExecutionBlock target, ShuffleType shuffleType, int numOutput) {
+ this(src.getId(), target.getId(), shuffleType, numOutput);
setSchema(src.getPlan().getOutSchema());
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
- this(srcId, targetId, partitionType);
- this.partitionNum = partNum;
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType, int numOutputs) {
+ this(srcId, targetId, shuffleType);
+ this.numOutputs = numOutputs;
}
public DataChannel(DataChannelProto proto) {
this.srcId = new ExecutionBlockId(proto.getSrcId());
this.targetId = new ExecutionBlockId(proto.getTargetId());
this.transmitType = proto.getTransmitType();
- this.partitionType = proto.getPartitionType();
+ this.shuffleType = proto.getShuffleType();
if (proto.hasSchema()) {
this.setSchema(new Schema(proto.getSchema()));
}
- if (proto.getPartitionKeyCount() > 0) {
- key = new Column[proto.getPartitionKeyCount()];
- for (int i = 0; i < proto.getPartitionKeyCount(); i++) {
- key[i] = new Column(proto.getPartitionKey(i));
+ if (proto.getShuffleKeysCount() > 0) {
+ shuffleKeys = new Column[proto.getShuffleKeysCount()];
+ for (int i = 0; i < proto.getShuffleKeysCount(); i++) {
+ shuffleKeys[i] = new Column(proto.getShuffleKeys(i));
}
} else {
- key = new Column[] {};
+ shuffleKeys = new Column[] {};
}
- if (proto.hasPartitionNum()) {
- this.partitionNum = proto.getPartitionNum();
+ if (proto.hasNumOutputs()) {
+ this.numOutputs = proto.getNumOutputs();
}
if (proto.hasStoreType()) {
@@ -91,8 +91,8 @@ public class DataChannel {
return targetId;
}
- public PartitionType getPartitionType() {
- return partitionType;
+ public ShuffleType getShuffleType() {
+ return shuffleType;
}
public TransmitType getTransmitType() {
@@ -103,37 +103,37 @@ public class DataChannel {
this.transmitType = transmitType;
}
- public void setPartition(PartitionType partitionType, Column [] keys, int numPartitions) {
- Preconditions.checkArgument(keys.length >= 0, "At least one partition key must be specified.");
- Preconditions.checkArgument(numPartitions > 0, "The number of partitions must be positive: %s", numPartitions);
+ public void setShuffle(ShuffleType shuffleType, Column[] keys, int numOutputs) {
+ Preconditions.checkArgument(keys.length >= 0, "At least one shuffle key must be specified.");
+ Preconditions.checkArgument(numOutputs > 0, "The number of outputs must be positive: %s", numOutputs);
- this.partitionType = partitionType;
- this.key = keys;
- this.partitionNum = numPartitions;
+ this.shuffleType = shuffleType;
+ this.shuffleKeys = keys;
+ this.numOutputs = numOutputs;
}
- public void setPartitionType(PartitionType partitionType) {
- this.partitionType = partitionType;
+ public void setShuffleType(ShuffleType shuffleType) {
+ this.shuffleType = shuffleType;
}
- public boolean hasPartitionKey() {
- return key != null;
+ public boolean hasShuffleKeys() {
+ return shuffleKeys != null;
}
- public void setPartitionKey(Column [] key) {
- this.key = key;
+ public void setShuffleKeys(Column[] key) {
+ this.shuffleKeys = key;
}
- public Column [] getPartitionKey() {
- return this.key;
+ public Column [] getShuffleKeys() {
+ return this.shuffleKeys;
}
- public void setPartitionNum(int partNum) {
- this.partitionNum = partNum;
+ public void setShuffleOutputNum(int partNum) {
+ this.numOutputs = partNum;
}
- public int getPartitionNum() {
- return partitionNum;
+ public int getShuffleOutputNum() {
+ return numOutputs;
}
public boolean hasStoreType() {
@@ -155,17 +155,17 @@ public class DataChannel {
if (transmitType != null) {
builder.setTransmitType(transmitType);
}
- builder.setPartitionType(partitionType);
+ builder.setShuffleType(shuffleType);
if (schema != null) {
builder.setSchema(schema.getProto());
}
- if (key != null) {
- for (Column column : key) {
- builder.addPartitionKey(column.getProto());
+ if (shuffleKeys != null) {
+ for (Column column : shuffleKeys) {
+ builder.addShuffleKeys(column.getProto());
}
}
- if (partitionNum != null) {
- builder.setPartitionNum(partitionNum);
+ if (numOutputs != null) {
+ builder.setNumOutputs(numOutputs);
}
if(storeType != null){
@@ -186,11 +186,11 @@ public class DataChannel {
StringBuilder sb = new StringBuilder();
sb.append("[").append(srcId.getQueryId()).append("] ");
sb.append(srcId.getId()).append(" => ").append(targetId.getId());
- sb.append(" (type=").append(partitionType);
- if (hasPartitionKey()) {
+ sb.append(" (type=").append(shuffleType);
+ if (hasShuffleKeys()) {
sb.append(", key=");
boolean first = true;
- for (Column column : getPartitionKey()) {
+ for (Column column : getShuffleKeys()) {
if (first) {
first = false;
} else {
@@ -198,7 +198,7 @@ public class DataChannel {
}
sb.append(column.getColumnName());
}
- sb.append(", num=").append(partitionNum);
+ sb.append(", num=").append(numOutputs);
}
sb.append(")");
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index b7e1ddb..abe6af3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -39,7 +39,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
import java.util.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
/**
* Build DAG
@@ -72,27 +72,43 @@ public class GlobalPlanner {
globalPlanContext.plan = masterPlan;
LOG.info(masterPlan.getLogicalPlan());
+ // copy a logical plan in order to keep the original logical plan. The distributed planner can modify
+ // an input logical plan.
LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan(),
masterPlan.getLogicalPlan().getRootBlock().getRoot());
+
+ // create a distributed execution plan by visiting each logical node.
+ // Its output is a graph, where each vertex is an execution block, and each edge is a data channel.
+ // MasterPlan contains them.
LogicalNode lastNode = planner.visit(globalPlanContext,
masterPlan.getLogicalPlan(), masterPlan.getLogicalPlan().getRootBlock(), inputPlan, new Stack<LogicalNode>());
-
ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
+ ExecutionBlock terminalBlock;
+ // TODO - consider two terminal types: specified output or not
if (childExecBlock.getPlan() != null) {
- ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
- DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
- dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
- dataChannel.setSchema(lastNode.getOutSchema());
- masterPlan.addConnect(dataChannel);
- masterPlan.setTerminal(terminalBlock);
- } else {
- masterPlan.setTerminal(childExecBlock);
+ terminalBlock = masterPlan.createTerminalBlock();
+ DataChannel finalChannel = new DataChannel(childExecBlock.getId(), terminalBlock.getId());
+ setFinalOutputChannel(finalChannel, lastNode.getOutSchema());
+ masterPlan.addConnect(finalChannel);
+ } else { // if one or more unions is terminal
+ terminalBlock = childExecBlock;
+ for (DataChannel outputChannel : masterPlan.getIncomingChannels(terminalBlock.getId())) {
+ setFinalOutputChannel(outputChannel, lastNode.getOutSchema());
+ }
}
+ masterPlan.setTerminal(terminalBlock);
LOG.info(masterPlan);
}
+ private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
+ outputChannel.setShuffleType(NONE_SHUFFLE);
+ outputChannel.setShuffleOutputNum(1);
+ outputChannel.setStoreType(CatalogProtos.StoreType.CSV);
+ outputChannel.setSchema(outputSchema);
+ }
+
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -105,15 +121,15 @@ public class GlobalPlanner {
ExecutionBlock parent, JoinNode join, boolean leftTable) {
ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
- DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+ DataChannel channel = new DataChannel(childBlock, parent, HASH_SHUFFLE, 32);
channel.setStoreType(storeType);
if (join.getJoinType() != JoinType.CROSS) {
Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
if (leftTable) {
- channel.setPartitionKey(joinColumns[0]);
+ channel.setShuffleKeys(joinColumns[0]);
} else {
- channel.setPartitionKey(joinColumns[1]);
+ channel.setShuffleKeys(joinColumns[1]);
}
}
return channel;
@@ -218,8 +234,8 @@ public class GlobalPlanner {
// setup channel
DataChannel channel;
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(groupbyNode.getGroupingColumns());
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+ channel.setShuffleKeys(groupbyNode.getGroupingColumns());
channel.setSchema(topMostOfFirstPhase.getOutSchema());
channel.setStoreType(storeType);
@@ -251,9 +267,9 @@ public class GlobalPlanner {
currentBlock = childBlock;
for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
if (firstPhaseGroupBy.isEmptyGrouping()) {
- dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 1);
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
} else {
- dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 32);
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
}
dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
@@ -273,11 +289,11 @@ public class GlobalPlanner {
DataChannel channel;
if (firstPhaseGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
- channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+ channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
} else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+ channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
}
channel.setSchema(firstPhaseGroupBy.getOutSchema());
channel.setStoreType(storeType);
@@ -306,8 +322,8 @@ public class GlobalPlanner {
childBlock.setPlan(firstSortNode);
currentBlock = masterPlan.newExecutionBlock();
- DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
+ DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_SHUFFLE, 32);
+ channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
channel.setSchema(firstSortNode.getOutSchema());
channel.setStoreType(storeType);
@@ -348,9 +364,9 @@ public class GlobalPlanner {
DataChannel channel = null;
CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
Column[] columns = new Column[partitionDesc.getColumns().size()];
- channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
+ channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else {
@@ -409,15 +425,15 @@ public class GlobalPlanner {
childBlock.setPlan(childLimit);
DataChannel channel = context.plan.getChannel(childBlock, execBlock);
- channel.setPartitionNum(1);
+ channel.setShuffleOutputNum(1);
context.execBlockMap.put(node.getPID(), execBlock);
} else {
node.setChild(execBlock.getPlan());
execBlock.setPlan(node);
ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
- DataChannel newChannel = new DataChannel(execBlock, newExecBlock, HASH_PARTITION, 1);
- newChannel.setPartitionKey(new Column[]{});
+ DataChannel newChannel = new DataChannel(execBlock, newExecBlock, HASH_SHUFFLE, 1);
+ newChannel.setShuffleKeys(new Column[]{});
newChannel.setSchema(node.getOutSchema());
newChannel.setStoreType(storeType);
@@ -525,7 +541,7 @@ public class GlobalPlanner {
}
for (ExecutionBlock childBlocks : queryBlockBlocks) {
- DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+ DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_SHUFFLE, 1);
channel.setStoreType(storeType);
context.plan.addConnect(channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 891b452..2ac2bc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -26,6 +26,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,8 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
public class MasterPlan {
private final QueryId queryId;
private final QueryContext context;
@@ -44,7 +43,8 @@ public class MasterPlan {
private ExecutionBlock terminalBlock;
private Map<ExecutionBlockId, ExecutionBlock> execBlockMap = new HashMap<ExecutionBlockId, ExecutionBlock>();
- private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph = new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
+ private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
+ new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
public ExecutionBlockId newExecutionBlockId() {
return new ExecutionBlockId(queryId, nextId.incrementAndGet());
@@ -108,11 +108,11 @@ public class MasterPlan {
execBlockGraph.addEdge(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
}
- public void addConnect(ExecutionBlock src, ExecutionBlock target, PartitionType type) {
+ public void addConnect(ExecutionBlock src, ExecutionBlock target, TajoWorkerProtocol.ShuffleType type) {
addConnect(src.getId(), target.getId(), type);
}
- public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
+ public void addConnect(ExecutionBlockId src, ExecutionBlockId target, TajoWorkerProtocol.ShuffleType type) {
addConnect(new DataChannel(src, target, type));
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
new file mode 100644
index 0000000..2f1a487
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+
+/**
+ * <code>PersistentStoreNode</code> an expression for a persistent data store step.
+ * This includes some basic information for materializing data.
+ */
+public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
+ @Expose protected String tableName;
+ @Expose protected CatalogProtos.StoreType storageType = CatalogProtos.StoreType.CSV;
+ @Expose protected Options options;
+
+ public PersistentStoreNode(int pid, String tableName) {
+ super(pid, NodeType.STORE);
+ this.tableName = tableName;
+ }
+
+ public final String getTableName() {
+ return this.tableName;
+ }
+
+ public void setStorageType(CatalogProtos.StoreType storageType) {
+ this.storageType = storageType;
+ }
+
+ public CatalogProtos.StoreType getStorageType() {
+ return this.storageType;
+ }
+
+ public boolean hasOptions() {
+ return this.options != null;
+ }
+
+ public Options getOptions() {
+ return this.options;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Store");
+ planStr.appendTitle(" into ").appendTitle(tableName);
+ planStr.addExplan("Store type: " + storageType);
+
+ return planStr;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PersistentStoreNode) {
+ PersistentStoreNode other = (PersistentStoreNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && this.tableName.equals(other.tableName);
+ eq = eq && this.storageType.equals(other.storageType);
+ eq = eq && TUtil.checkEquals(options, other.options);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ PersistentStoreNode store = (PersistentStoreNode) super.clone();
+ store.tableName = tableName;
+ store.storageType = storageType != null ? storageType : null;
+ store.options = options != null ? (Options) options.clone() : null;
+ return store;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"Store\": {\"table\": \""+tableName);
+ if (storageType != null) {
+ sb.append(", storage: "+ storageType.name());
+ }
+
+ sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
+ .append("\n \"in schema\": ").append(getInSchema());
+
+ sb.append("}");
+
+ return sb.toString() + "\n"
+ + getChild().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
new file mode 100644
index 0000000..180b1a2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.NONE_SHUFFLE;
+
+/**
+ * ShuffeFileWriteNode is an expression for an intermediate data materialization step.
+ */
+public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneable {
+ @Expose private TajoWorkerProtocol.ShuffleType shuffleType = NONE_SHUFFLE;
+ @Expose private int numOutputs;
+ @Expose private Column [] shuffleKeys;
+
+ public ShuffleFileWriteNode(int pid, String tableName) {
+ super(pid, tableName);
+ }
+
+ public final int getNumOutputs() {
+ return this.numOutputs;
+ }
+
+ public final boolean hasShuffleKeys() {
+ return this.shuffleKeys != null;
+ }
+
+ public final Column [] getShuffleKeys() {
+ return shuffleKeys;
+ }
+
+ public final void setShuffle(TajoWorkerProtocol.ShuffleType type, Column[] keys, int numPartitions) {
+ Preconditions.checkArgument(keys.length >= 0,
+ "At least one partition key must be specified.");
+ Preconditions.checkArgument(numPartitions > 0,
+ "The number of partitions must be positive: %s", numPartitions);
+
+ this.shuffleType = type;
+ this.shuffleKeys = keys;
+ this.numOutputs = numPartitions;
+ }
+
+ public TajoWorkerProtocol.ShuffleType getShuffleType() {
+ return this.shuffleType;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ShuffleFileWriteNode) {
+ ShuffleFileWriteNode other = (ShuffleFileWriteNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && this.numOutputs == other.numOutputs;
+ eq = eq && TUtil.checkEquals(shuffleKeys, other.shuffleKeys);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ ShuffleFileWriteNode store = (ShuffleFileWriteNode) super.clone();
+ store.numOutputs = numOutputs;
+ store.shuffleKeys = shuffleKeys != null ? shuffleKeys.clone() : null;
+ return store;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"Store\": {\"table\": \""+tableName);
+ if (storageType != null) {
+ sb.append(", storage: "+ storageType.name());
+ }
+ sb.append(", partnum: ").append(numOutputs).append("}")
+ .append(", ");
+ if (shuffleKeys != null) {
+ sb.append("\"partition keys: [");
+ for (int i = 0; i < shuffleKeys.length; i++) {
+ sb.append(shuffleKeys[i]);
+ if (i < shuffleKeys.length - 1)
+ sb.append(",");
+ }
+ sb.append("],");
+ }
+
+ sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
+ .append("\n \"in schema\": ").append(getInSchema());
+
+ sb.append("}");
+
+ return sb.toString() + "\n" + getChild().toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 634fa3a..843a70f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -18,44 +18,28 @@
package org.apache.tajo.engine.planner.logical;
-import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.LIST_PARTITION;
-
-public class StoreTableNode extends UnaryNode implements Cloneable {
- @Expose private String tableName;
- @Expose private StoreType storageType = StoreType.CSV;
- @Expose private PartitionType partitionType;
- @Expose private int numPartitions;
- @Expose private Column [] partitionKeys;
- @Expose private Options options;
+
+public class StoreTableNode extends PersistentStoreNode implements Cloneable {
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
@Expose private PartitionDesc partitionDesc;
public StoreTableNode(int pid, String tableName) {
- super(pid, NodeType.STORE);
- this.tableName = tableName;
+ super(pid, tableName);
}
public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
- super(pid, NodeType.STORE);
- this.tableName = tableName;
+ super(pid, tableName);
this.partitionDesc = partitionDesc;
}
- public final String getTableName() {
- return this.tableName;
- }
-
public void setStorageType(StoreType storageType) {
this.storageType = storageType;
}
@@ -63,39 +47,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
public StoreType getStorageType() {
return this.storageType;
}
-
- public final int getNumPartitions() {
- return this.numPartitions;
- }
-
- public final boolean hasPartitionKey() {
- return this.partitionKeys != null;
- }
-
- public final Column [] getPartitionKeys() {
- return this.partitionKeys;
- }
-
- public final void setDefaultParition() {
- this.partitionType = LIST_PARTITION;
- this.partitionKeys = null;
- this.numPartitions = 1;
- }
-
- public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
- Preconditions.checkArgument(keys.length >= 0,
- "At least one partition key must be specified.");
- Preconditions.checkArgument(numPartitions > 0,
- "The number of partitions must be positive: %s", numPartitions);
-
- this.partitionType = type;
- this.partitionKeys = keys;
- this.numPartitions = numPartitions;
- }
-
- public PartitionType getPartitionType() {
- return this.partitionType;
- }
public boolean hasOptions() {
return this.options != null;
@@ -139,11 +90,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
if (obj instanceof StoreTableNode) {
StoreTableNode other = (StoreTableNode) obj;
boolean eq = super.equals(other);
- eq = eq && this.tableName.equals(other.tableName);
- eq = eq && this.storageType.equals(other.storageType);
- eq = eq && this.numPartitions == other.numPartitions;
- eq = eq && TUtil.checkEquals(partitionKeys, other.partitionKeys);
- eq = eq && TUtil.checkEquals(options, other.options);
eq = eq && isCreatedTable == other.isCreatedTable;
eq = eq && isOverwritten == other.isOverwritten;
eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
@@ -156,11 +102,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
StoreTableNode store = (StoreTableNode) super.clone();
- store.tableName = tableName;
- store.storageType = storageType != null ? storageType : null;
- store.numPartitions = numPartitions;
- store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
- store.options = options != null ? (Options) options.clone() : null;
store.isCreatedTable = isCreatedTable;
store.isOverwritten = isOverwritten;
store.partitionDesc = partitionDesc;
@@ -173,17 +114,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
if (storageType != null) {
sb.append(", storage: "+ storageType.name());
}
- sb.append(", partnum: ").append(numPartitions).append("}")
- .append(", ");
- if (partitionKeys != null) {
- sb.append("\"partition keys: [");
- for (int i = 0; i < partitionKeys.length; i++) {
- sb.append(partitionKeys[i]);
- if (i < partitionKeys.length - 1)
- sb.append(",");
- }
- sb.append("],");
- }
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
index 485e0d1..b620b22 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
@@ -29,14 +29,14 @@ public class HashPartitioner extends Partitioner {
public HashPartitioner(final int [] keys, final int numPartitions) {
super(keys, numPartitions);
- this.keyTuple = new VTuple(partitionKeys.length);
+ this.keyTuple = new VTuple(partitionKeyIds.length);
}
@Override
public int getPartition(Tuple tuple) {
// build one key tuple
- for (int i = 0; i < partitionKeys.length; i++) {
- keyTuple.put(i, tuple.get(partitionKeys[i]));
+ for (int i = 0; i < partitionKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(partitionKeyIds[i]));
}
return (keyTuple.hashCode() & Integer.MAX_VALUE) %
(numPartitions == 32 ? numPartitions-1 : numPartitions);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
new file mode 100644
index 0000000..c09ec19
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle keys. The file outputs are stored on local disks.
+ */
+public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
+ private final ShuffleFileWriteNode plan;
+ private final TableMeta meta;
+ private final Partitioner partitioner;
+ private final Path storeTablePath;
+ private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+ private final int numShuffleOutputs;
+ private final int [] shuffleKeyIds;
+
+ public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
+ final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ Preconditions.checkArgument(plan.hasShuffleKeys());
+ this.plan = plan;
+ if (plan.hasOptions()) {
+ this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+ } else {
+ this.meta = CatalogUtil.newTableMeta(plan.getStorageType());
+ }
+ // about the shuffle
+ this.numShuffleOutputs = this.plan.getNumOutputs();
+ int i = 0;
+ this.shuffleKeyIds = new int [this.plan.getShuffleKeys().length];
+ for (Column key : this.plan.getShuffleKeys()) {
+ shuffleKeyIds[i] = inSchema.getColumnId(key.getQualifiedName());
+ i++;
+ }
+ this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
+ storeTablePath = new Path(context.getWorkDir(), "output");
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+ FileSystem fs = new RawLocalFileSystem();
+ fs.mkdirs(storeTablePath);
+ }
+
+ private Appender getAppender(int partId) throws IOException {
+ Appender appender = appenderMap.get(partId);
+
+ if (appender == null) {
+ Path dataFile = getDataFile(partId);
+ FileSystem fs = dataFile.getFileSystem(context.getConf());
+ if (fs.exists(dataFile)) {
+ LOG.info("File " + dataFile + " already exists!");
+ FileStatus status = fs.getFileStatus(dataFile);
+ LOG.info("File size: " + status.getLen());
+ }
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+ appender.enableStats();
+ appender.init();
+ appenderMap.put(partId, appender);
+ } else {
+ appender = appenderMap.get(partId);
+ }
+
+ return appender;
+ }
+
+ private Path getDataFile(int partId) {
+ return StorageUtil.concatPath(storeTablePath, ""+partId);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ Appender appender;
+ int partId;
+ while ((tuple = child.next()) != null) {
+ partId = partitioner.getPartition(tuple);
+ appender = getAppender(partId);
+ appender.addTuple(tuple);
+ }
+
+ List<TableStats> statSet = new ArrayList<TableStats>();
+ for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
+ int partNum = entry.getKey();
+ Appender app = entry.getValue();
+ app.flush();
+ app.close();
+ statSet.add(app.getStats());
+ if (app.getStats().getNumRows() > 0) {
+ context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
+ }
+ }
+
+ // Collect and aggregated statistics data
+ TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+ context.setResultStats(aggregated);
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ // nothing to do
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
deleted file mode 100644
index afb4d3c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class IndexedStoreExec extends UnaryPhysicalExec {
- private static Log LOG = LogFactory.getLog(IndexedStoreExec.class);
- private final SortSpec[] sortSpecs;
- private int [] indexKeys = null;
- private Schema keySchema;
-
- private BSTIndex.BSTIndexWriter indexWriter;
- private TupleComparator comp;
- private FileAppender appender;
- private TableMeta meta;
-
- public IndexedStoreExec(final TaskAttemptContext context, final AbstractStorageManager sm,
- final PhysicalExec child, final Schema inSchema, final Schema outSchema,
- final SortSpec[] sortSpecs) throws IOException {
- super(context, inSchema, outSchema, child);
- this.sortSpecs = sortSpecs;
- }
-
- public void init() throws IOException {
- super.init();
-
- indexKeys = new int[sortSpecs.length];
- keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-
- Column col;
- for (int i = 0 ; i < sortSpecs.length; i++) {
- col = sortSpecs[i].getSortKey();
- indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
- }
-
- BSTIndex bst = new BSTIndex(new TajoConf());
- this.comp = new TupleComparator(keySchema, sortSpecs);
- Path storeTablePath = new Path(context.getWorkDir(), "output");
- LOG.info("Output data directory: " + storeTablePath);
- this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
- context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
- FileSystem fs = new RawLocalFileSystem();
- fs.mkdirs(storeTablePath);
- this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
- outSchema, new Path(storeTablePath, "output"));
- this.appender.enableStats();
- this.appender.init();
- this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- this.indexWriter.setLoadNum(100);
- this.indexWriter.open();
- }
-
- @Override
- public Tuple next() throws IOException {
- Tuple tuple;
- Tuple keyTuple;
- Tuple prevKeyTuple = null;
- long offset;
-
-
- while((tuple = child.next()) != null) {
- offset = appender.getOffset();
- appender.addTuple(tuple);
- keyTuple = new VTuple(keySchema.getColumnNum());
- RowStoreUtil.project(tuple, keyTuple, indexKeys);
- if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
- indexWriter.write(keyTuple, offset);
- prevKeyTuple = keyTuple;
- }
- }
-
- return null;
- }
-
- @Override
- public void rescan() throws IOException {
- }
-
- public void close() throws IOException {
- super.close();
-
- appender.flush();
- appender.close();
- indexWriter.flush();
- indexWriter.close();
-
- // Collect statistics data
- context.setResultStats(appender.getStats());
- context.addRepartition(0, context.getTaskId().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
deleted file mode 100644
index bcea189..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public final class PartitionedStoreExec extends UnaryPhysicalExec {
- private static Log LOG = LogFactory.getLog(PartitionedStoreExec.class);
- private static final NumberFormat numFormat = NumberFormat.getInstance();
-
- static {
- numFormat.setGroupingUsed(false);
- numFormat.setMinimumIntegerDigits(6);
- }
-
- private final StoreTableNode plan;
-
- private final int numPartitions;
- private final int [] partitionKeys;
-
- private final TableMeta meta;
- private final Partitioner partitioner;
- private final Path storeTablePath;
- private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
-
- public PartitionedStoreExec(TaskAttemptContext context, final AbstractStorageManager sm,
- final StoreTableNode plan, final PhysicalExec child) throws IOException {
- super(context, plan.getInSchema(), plan.getOutSchema(), child);
- Preconditions.checkArgument(plan.hasPartitionKey());
- this.plan = plan;
- this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
- // about the partitions
- this.numPartitions = this.plan.getNumPartitions();
- int i = 0;
- this.partitionKeys = new int [this.plan.getPartitionKeys().length];
- for (Column key : this.plan.getPartitionKeys()) {
- partitionKeys[i] = inSchema.getColumnId(key.getQualifiedName());
- i++;
- }
- this.partitioner = new HashPartitioner(partitionKeys, numPartitions);
- storeTablePath = new Path(context.getWorkDir(), "output");
- }
-
- @Override
- public void init() throws IOException {
- super.init();
- FileSystem fs = new RawLocalFileSystem();
- fs.mkdirs(storeTablePath);
- }
-
- private Appender getAppender(int partition) throws IOException {
- Appender appender = appenderMap.get(partition);
-
- if (appender == null) {
- Path dataFile = getDataFile(partition);
- FileSystem fs = dataFile.getFileSystem(context.getConf());
- if (fs.exists(dataFile)) {
- LOG.info("File " + dataFile + " already exists!");
- FileStatus status = fs.getFileStatus(dataFile);
- LOG.info("File size: " + status.getLen());
- }
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
- appender.enableStats();
- appender.init();
- appenderMap.put(partition, appender);
- } else {
- appender = appenderMap.get(partition);
- }
-
- return appender;
- }
-
- private Path getDataFile(int partition) {
- return StorageUtil.concatPath(storeTablePath, ""+partition);
- }
-
- @Override
- public Tuple next() throws IOException {
- Tuple tuple;
- Appender appender;
- int partition;
- while ((tuple = child.next()) != null) {
- partition = partitioner.getPartition(tuple);
- appender = getAppender(partition);
- appender.addTuple(tuple);
- }
-
- List<TableStats> statSet = new ArrayList<TableStats>();
- for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
- int partNum = entry.getKey();
- Appender app = entry.getValue();
- app.flush();
- app.close();
- statSet.add(app.getStats());
- if (app.getStats().getNumRows() > 0) {
- context.addRepartition(partNum, getDataFile(partNum).getName());
- }
- }
-
- // Collect and aggregated statistics data
- TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
- context.setResultStats(aggregated);
-
- return null;
- }
-
- @Override
- public void rescan() throws IOException {
- // nothing to do
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
index 866738d..b67f45c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
@@ -25,7 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.tajo.storage.Tuple;
public abstract class Partitioner {
- protected final int [] partitionKeys;
+ protected final int [] partitionKeyIds;
protected final int numPartitions;
public Partitioner(final int [] keyList, final int numPartitions) {
@@ -35,7 +35,7 @@ public abstract class Partitioner {
"At least one partition key must be specified.");
Preconditions.checkArgument(numPartitions > 0,
"The number of partitions must be positive: %s", numPartitions);
- this.partitionKeys = keyList;
+ this.partitionKeyIds = keyList;
this.numPartitions = numPartitions;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
new file mode 100644
index 0000000..13573eb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * <code>RangeShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle key ranges. The file outputs are stored with index files on local disks.
+ * <code>RangeShuffleFileWriteExec</code> is implemented with an assumption that input tuples are sorted in an
+ * specified order of shuffle keys.
+ */
+public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
+ private final SortSpec[] sortSpecs;
+ private int [] indexKeys = null;
+ private Schema keySchema;
+
+ private BSTIndex.BSTIndexWriter indexWriter;
+ private TupleComparator comp;
+ private FileAppender appender;
+ private TableMeta meta;
+
+ public RangeShuffleFileWriteExec(final TaskAttemptContext context, final AbstractStorageManager sm,
+ final PhysicalExec child, final Schema inSchema, final Schema outSchema,
+ final SortSpec[] sortSpecs) throws IOException {
+ super(context, inSchema, outSchema, child);
+ this.sortSpecs = sortSpecs;
+ }
+
+ public void init() throws IOException {
+ super.init();
+
+ indexKeys = new int[sortSpecs.length];
+ keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+ Column col;
+ for (int i = 0 ; i < sortSpecs.length; i++) {
+ col = sortSpecs[i].getSortKey();
+ indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+ }
+
+ BSTIndex bst = new BSTIndex(new TajoConf());
+ this.comp = new TupleComparator(keySchema, sortSpecs);
+ Path storeTablePath = new Path(context.getWorkDir(), "output");
+ LOG.info("Output data directory: " + storeTablePath);
+ this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+ context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+ FileSystem fs = new RawLocalFileSystem();
+ fs.mkdirs(storeTablePath);
+ this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+ outSchema, new Path(storeTablePath, "output"));
+ this.appender.enableStats();
+ this.appender.init();
+ this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ this.indexWriter.setLoadNum(100);
+ this.indexWriter.open();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+ Tuple prevKeyTuple = null;
+ long offset;
+
+
+ while((tuple = child.next()) != null) {
+ offset = appender.getOffset();
+ appender.addTuple(tuple);
+ keyTuple = new VTuple(keySchema.getColumnNum());
+ RowStoreUtil.project(tuple, keyTuple, indexKeys);
+ if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
+ indexWriter.write(keyTuple, offset);
+ prevKeyTuple = keyTuple;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ }
+
+ public void close() throws IOException {
+ super.close();
+
+ appender.flush();
+ appender.close();
+ indexWriter.flush();
+ indexWriter.close();
+
+ // Collect statistics data
+ context.setResultStats(appender.getStats());
+ context.addShuffleFileOutput(0, context.getTaskId().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 673e0b5..affdc86 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -21,33 +21,25 @@
*/
package org.apache.tajo.engine.planner.physical;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
-import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
/**
- * This physical operator stores a relation into a table.
+ * This is a physical executor to store a table part into a specified storage.
*/
public class StoreTableExec extends UnaryPhysicalExec {
- private final StoreTableNode plan;
+ private final PersistentStoreNode plan;
private Appender appender;
private Tuple tuple;
-
- /**
- * @throws java.io.IOException
- *
- */
- public StoreTableExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+
+ public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
}
@@ -62,16 +54,9 @@ public class StoreTableExec extends UnaryPhysicalExec {
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}
- if (context.isInterQuery()) {
- Path storeTablePath = new Path(context.getWorkDir(), "out");
- FileSystem fs = new RawLocalFileSystem();
- fs.mkdirs(storeTablePath);
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
- StorageUtil.concatPath(storeTablePath, "0"));
- } else {
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
- context.getOutputPath());
- }
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
+ context.getOutputPath());
+
appender.enableStats();
appender.init();
}
@@ -100,8 +85,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
appender.close();
// Collect statistics data
-// ctx.addStatSet(annotation.getType().toString(), appender.getStats());
context.setResultStats(appender.getStats());
- context.addRepartition(0, context.getTaskId().toString());
+ context.addShuffleFileOutput(0, context.getTaskId().toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 5e7c82f..85dfa2e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -32,7 +32,6 @@ import org.apache.tajo.QueryUnitId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
import org.apache.tajo.master.FragmentPair;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -50,6 +49,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
public class QueryUnit implements EventHandler<TaskEvent> {
/** Class Logger */
@@ -65,7 +65,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private Map<String, Set<FragmentProto>> fragMap;
private Map<String, Set<URI>> fetchMap;
- private List<Partition> partitions;
+ private List<ShuffleFileOutput> partitions;
private TableStats stats;
private final boolean isLeafTask;
private List<IntermediateEntry> intermediateData;
@@ -127,7 +127,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
scan = new ArrayList<ScanNode>();
fetchMap = Maps.newHashMap();
fragMap = Maps.newHashMap();
- partitions = new ArrayList<Partition>();
+ partitions = new ArrayList<ShuffleFileOutput>();
attempts = Collections.emptyMap();
lastAttemptId = null;
nextAttempt = -1;
@@ -329,7 +329,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
this.stats = stats;
}
- public void setPartitions(List<Partition> partitions) {
+ public void setPartitions(List<ShuffleFileOutput> partitions) {
this.partitions = Collections.unmodifiableList(partitions);
}
@@ -337,7 +337,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.stats;
}
- public List<Partition> getPartitions() {
+ public List<ShuffleFileOutput> getPartitions() {
return this.partitions;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 0c50704..f65f810 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
@@ -40,6 +39,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
@@ -186,13 +187,13 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
}
private void fillTaskStatistics(TaskCompletionReport report) {
- if (report.getPartitionsCount() > 0) {
- this.getQueryUnit().setPartitions(report.getPartitionsList());
+ if (report.getShuffleFileOutputsCount() > 0) {
+ this.getQueryUnit().setPartitions(report.getShuffleFileOutputsList());
List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
- for (Partition p : report.getPartitionsList()) {
+ for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
- getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
+ getId().getId(), p.getPartId(), getHost(), getPullServerPort());
partitions.add(entry);
}
this.getQueryUnit().setIntermediateData(partitions);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 2185acf..bec0620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -55,9 +55,9 @@ import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.RANGE_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
/**
* Repartitioner creates non-leaf tasks and shuffles intermediate data.
@@ -223,7 +223,7 @@ public class Repartitioner {
for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
execBlock.getId(),
- partitionId, HASH_PARTITION,
+ partitionId, HASH_SHUFFLE,
requestPerNode.getValue());
fetchURIs.addAll(uris);
}
@@ -257,9 +257,9 @@ public class Repartitioner {
MasterPlan masterPlan, SubQuery subQuery, SubQuery childSubQuery,
DataChannel channel, int maxNum)
throws InternalException {
- if (channel.getPartitionType() == HASH_PARTITION) {
+ if (channel.getShuffleType() == HASH_SHUFFLE) {
scheduleHashPartitionedFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
- } else if (channel.getPartitionType() == RANGE_PARTITION) {
+ } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
scheduleRangePartitionedFetches(schedulerContext, subQuery, childSubQuery, channel, maxNum);
} else {
throw new InternalException("Cannot support partition type");
@@ -281,7 +281,7 @@ public class Repartitioner {
SortNode sortNode = PlannerUtil.findTopNode(childSubQuery.getBlock().getPlan(), NodeType.SORT);
SortSpec [] sortSpecs = sortNode.getSortKeys();
- Schema sortSchema = new Schema(channel.getPartitionKey());
+ Schema sortSchema = new Schema(channel.getShuffleKeys());
// calculate the number of maximum query ranges
TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, stat.getColumnStats());
@@ -420,7 +420,7 @@ public class Repartitioner {
hashedByHost = hashByHost(interm.getValue());
for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
- interm.getKey(), channel.getPartitionType(), e.getValue());
+ interm.getKey(), channel.getShuffleType(), e.getValue());
if (finalFetchURI.containsKey(interm.getKey())) {
finalFetchURI.get(interm.getKey()).addAll(uris);
@@ -449,7 +449,7 @@ public class Repartitioner {
}
public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
- int partitionId, PartitionType type, List<IntermediateEntry> entries) {
+ int partitionId, ShuffleType type, List<IntermediateEntry> entries) {
String scheme = "http://";
StringBuilder urlPrefix = new StringBuilder(scheme);
urlPrefix.append(hostAndPort).append("/?")
@@ -457,9 +457,9 @@ public class Repartitioner {
.append("&sid=").append(ebid.getId())
.append("&p=").append(partitionId)
.append("&type=");
- if (type == HASH_PARTITION) {
+ if (type == HASH_SHUFFLE) {
urlPrefix.append("h");
- } else if (type == RANGE_PARTITION) {
+ } else if (type == RANGE_SHUFFLE) {
urlPrefix.append("r");
}
urlPrefix.append("&ta=");
@@ -542,22 +542,22 @@ public class Repartitioner {
// set the partition number for the current logicalUnit
// TODO: the union handling is required when a join has unions as its child
MasterPlan masterPlan = subQuery.getMasterPlan();
- keys = channel.getPartitionKey();
+ keys = channel.getShuffleKeys();
if (!masterPlan.isRoot(subQuery.getBlock()) ) {
ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
if (parentBlock.getPlan().getType() == NodeType.JOIN) {
- channel.setPartitionNum(desiredNum);
+ channel.setShuffleOutputNum(desiredNum);
}
}
// set the partition number for group by and sort
- if (channel.getPartitionType() == HASH_PARTITION) {
+ if (channel.getShuffleType() == HASH_SHUFFLE) {
if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
GroupbyNode groupby = (GroupbyNode) execBlock.getPlan();
keys = groupby.getGroupingColumns();
}
- } else if (channel.getPartitionType() == RANGE_PARTITION) {
+ } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
if (execBlock.getPlan().getType() == NodeType.SORT) {
SortNode sort = (SortNode) execBlock.getPlan();
keys = new Column[sort.getSortKeys().length];
@@ -568,11 +568,11 @@ public class Repartitioner {
}
if (keys != null) {
if (keys.length == 0) {
- channel.setPartitionKey(new Column[] {});
- channel.setPartitionNum(1);
+ channel.setShuffleKeys(new Column[]{});
+ channel.setShuffleOutputNum(1);
} else {
- channel.setPartitionKey(keys);
- channel.setPartitionNum(desiredNum);
+ channel.setShuffleKeys(keys);
+ channel.setShuffleOutputNum(desiredNum);
}
}
return subQuery;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 5979fbc..ef3a11f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -61,7 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
/**
@@ -552,7 +552,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
* methods and the number of partitions to a given subquery.
*/
private static void setRepartitionIfNecessary(SubQuery subQuery, DataChannel channel) {
- if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+ if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
int numTasks = calculatePartitionNum(subQuery, channel);
Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks, channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index a1c383a..eff384b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -104,7 +104,7 @@ public class Task {
private AtomicBoolean progressFlag = new AtomicBoolean(false);
// TODO - to be refactored
- private PartitionType partitionType = null;
+ private ShuffleType shuffleType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
@@ -163,9 +163,9 @@ public class Task {
interQuery = request.getProto().getInterQuery();
if (interQuery) {
context.setInterQuery();
- this.partitionType = context.getDataChannel().getPartitionType();
+ this.shuffleType = context.getDataChannel().getShuffleType();
- if (partitionType == PartitionType.RANGE_PARTITION) {
+ if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
@@ -185,7 +185,7 @@ public class Task {
LOG.info("==================================");
LOG.info("* Subquery " + request.getId() + " is initialized");
LOG.info("* InterQuery: " + interQuery
- + (interQuery ? ", Use " + this.partitionType + " partitioning":""));
+ + (interQuery ? ", Use " + this.shuffleType + " partitioning":""));
LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
@@ -326,13 +326,13 @@ public class Task {
builder.setResultStats(new TableStats().getProto());
}
- Iterator<Entry<Integer,String>> it = context.getRepartitions();
+ Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
if (it.hasNext()) {
do {
Entry<Integer,String> entry = it.next();
- Partition.Builder part = Partition.newBuilder();
- part.setPartitionKey(entry.getKey());
- builder.addPartitions(part.build());
+ ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+ part.setPartId(entry.getKey());
+ builder.addShuffleFileOutputs(part.build());
} while (it.hasNext());
}