You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/10 17:48:27 UTC
[2/2] git commit: TAJO-584: Improve distributed merge sort.
TAJO-584: Improve distributed merge sort.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/214b9741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/214b9741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/214b9741
Branch: refs/heads/master
Commit: 214b9741a510d1c2013e0dd494ab66017962367a
Parents: 4179a7c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Feb 11 01:48:15 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Feb 11 01:48:15 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../exception/AlreadyExistsTableException.java | 2 +-
.../org/apache/tajo/jdbc/TajoResultSet.java | 24 +-
.../tajo/engine/planner/BaseAlgebraVisitor.java | 17 +-
.../apache/tajo/engine/planner/LogicalPlan.java | 2 +-
.../engine/planner/PhysicalPlannerImpl.java | 149 ++++++---
.../apache/tajo/engine/planner/PlannerUtil.java | 10 +
.../engine/planner/PreLogicalPlanVerifier.java | 33 +-
.../engine/planner/RangePartitionAlgorithm.java | 86 +++--
.../engine/planner/UniformRangePartition.java | 160 +++++++---
.../engine/planner/global/GlobalPlanner.java | 2 +
.../planner/physical/ExternalSortExec.java | 77 +++--
.../planner/physical/UnaryPhysicalExec.java | 12 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 316 +------------------
.../tajo/master/querymaster/Repartitioner.java | 8 +-
.../tajo/worker/RangeRetrieverHandler.java | 9 +-
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../planner/TestUniformRangePartition.java | 68 ++--
.../planner/physical/TestBSTIndexExec.java | 3 +-
.../planner/physical/TestExternalSortExec.java | 6 +-
.../physical/TestLeftOuterHashJoinExec.java | 4 +
.../physical/TestLeftOuterNLJoinExec.java | 21 +-
.../planner/physical/TestPhysicalPlanner.java | 3 +
.../physical/TestRightOuterHashJoinExec.java | 5 +-
.../engine/planner/physical/TestSortExec.java | 6 +-
.../apache/tajo/engine/query/TestSortQuery.java | 10 +
.../apache/tajo/engine/util/TestTupleUtil.java | 126 +-------
.../tajo/worker/TestRangeRetrieverHandler.java | 12 +-
.../resources/dataset/TestSortQuery/table2.tbl | 24 ++
.../org/apache/tajo/jdbc/TestTajoResultSet.java | 63 ++++
.../TestJoinQuery/testOuterJoinAndCaseWhen1.sql | 5 +-
.../create_table_with_asc_desc_keys.sql | 1 +
.../TestSortQuery/testSortWithAscDescKeys.sql | 1 +
.../tajo/pullserver/PullServerAuxService.java | 9 +-
.../tajo/pullserver/TajoPullServerService.java | 18 +-
.../org/apache/tajo/storage/MergeScanner.java | 22 +-
.../java/org/apache/tajo/storage/RawFile.java | 24 +-
.../apache/tajo/storage/TupleComparator.java | 4 +
.../org/apache/tajo/storage/TupleRange.java | 26 +-
39 files changed, 680 insertions(+), 692 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8dc5ee5..d79ec91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-584: Improve distributed merge sort. (hyunsik)
+
TAJO-36: Improve ExternalSortExec with N-merge sort and final pass
omission. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
index 98c04b5..a2e3a6a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
@@ -26,6 +26,6 @@ public class AlreadyExistsTableException extends CatalogException {
}
public AlreadyExistsTableException(String tableName) {
- super("Already Exists Table: "+tableName);
+ super("Already exists table: "+tableName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 3977d76..942107c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -18,6 +18,7 @@
package org.apache.tajo.jdbc;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -25,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.tajo.QueryId;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.FileScanner;
@@ -37,7 +37,6 @@ import org.apache.tajo.storage.fragment.FileFragment;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -63,7 +62,7 @@ public class TajoResultSet extends TajoResultSetBase {
fs = FileScanner.getFileSystem(conf, desc.getPath());
this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
- Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+ List<FileFragment> frags = getFragments(desc.getPath());
scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
}
init();
@@ -75,23 +74,30 @@ public class TajoResultSet extends TajoResultSetBase {
curRow = 0;
}
- static class FileNameComparator implements Comparator<FileStatus> {
+ public static class FileNameComparator implements Comparator<FileStatus> {
@Override
public int compare(FileStatus f1, FileStatus f2) {
- return f2.getPath().getName().compareTo(f1.getPath().getName());
+ return f1.getPath().getName().compareTo(f2.getPath().getName());
}
}
- private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
+ private List<FileFragment> getFragments(Path tablePath)
throws IOException {
- List<FileFragment> fraglist = Lists.newArrayList();
+ List<FileFragment> fragments = Lists.newArrayList();
FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().charAt(0) != '.';
}
});
+
+
+ // The files must be sorted in an ascending order of file names
+ // in order to guarantee the order of a sort operation.
+ // This is because our distributed sort algorithm outputs
+ // a sequence of sorted data files, each of which contains sorted rows
+ // within each file.
Arrays.sort(files, new FileNameComparator());
String tbname = tablePath.getName();
@@ -99,9 +105,9 @@ public class TajoResultSet extends TajoResultSetBase {
if (files[i].getLen() == 0) {
continue;
}
- fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+ fragments.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
}
- return fraglist;
+ return ImmutableList.copyOf(fragments);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 6cc6fd0..25ee316 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -269,14 +269,19 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
@Override
public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
stack.push(expr);
- if (!expr.isAllProjected()) {
- for (NamedExpr target : expr.getNamedExprs()) {
- visit(ctx, stack, target);
+ try {
+ if (!expr.isAllProjected()) {
+ for (NamedExpr target : expr.getNamedExprs()) {
+ visit(ctx, stack, target);
+ }
}
+ if (expr.hasChild()) {
+ return visit(ctx, stack, expr.getChild());
+ }
+ } finally {
+ stack.pop();
}
- RESULT result = visit(ctx, stack, expr.getChild());
- stack.pop();
- return result;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 6afaaad..9890943 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -245,7 +245,7 @@ public class LogicalPlan {
// Trying to find the column within the current block
- if (block.currentNode != null) {
+ if (block.currentNode != null && block.currentNode.getInSchema() != null) {
Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName());
if (found != null) {
return found;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index dfa2e40..5583efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -35,6 +35,7 @@ import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -45,6 +46,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.List;
+import java.util.Stack;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
@@ -73,7 +75,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PhysicalExec execPlan;
try {
- execPlan = createPlanRecursive(context, logicalPlan);
+ execPlan = createPlanRecursive(context, logicalPlan, new Stack<LogicalNode>());
if (execPlan instanceof StoreTableExec
|| execPlan instanceof RangeShuffleFileWriteExec
|| execPlan instanceof HashShuffleFileWriteExec
@@ -103,7 +105,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return outExecPlan;
}
- private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+ private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack)
+ throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
@@ -111,7 +114,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
case ROOT:
LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
- return createPlanRecursive(ctx, rootNode.getChild());
+ stack.push(rootNode);
+ leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack);
+ stack.pop();
+ return leftExec;
case EXPRS:
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,61 +127,81 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
case INSERT:
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
- leftExec = createPlanRecursive(ctx, storeNode.getChild());
+ stack.push(storeNode);
+ leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack);
+ stack.pop();
return createStorePlan(ctx, storeNode, leftExec);
case SELECTION:
SelectionNode selNode = (SelectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, selNode.getChild());
+ stack.push(selNode);
+ leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
+ stack.pop();
return new SelectionExec(ctx, selNode, leftExec);
case PROJECTION:
ProjectionNode prjNode = (ProjectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, prjNode.getChild());
+ stack.push(prjNode);
+ leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
+ stack.pop();
return new ProjectionExec(ctx, prjNode, leftExec);
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
- leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
- ProjectionExec projectionExec = new ProjectionExec(ctx, subQueryNode, leftExec);
- return projectionExec;
+ stack.push(subQueryNode);
+ leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack);
+ stack.pop();
+ return new ProjectionExec(ctx, subQueryNode, leftExec);
+
}
case PARTITIONS_SCAN:
case SCAN:
- leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
+ leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack);
return leftExec;
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
- leftExec = createPlanRecursive(ctx, grpNode.getChild());
+ stack.push(grpNode);
+ leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack);
+ stack.pop();
return createGroupByPlan(ctx, grpNode, leftExec);
case HAVING:
HavingNode havingNode = (HavingNode) logicalNode;
- leftExec = createPlanRecursive(ctx, havingNode.getChild());
+ stack.push(havingNode);
+ leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack);
+ stack.pop();
return new HavingExec(ctx, havingNode, leftExec);
case SORT:
SortNode sortNode = (SortNode) logicalNode;
- leftExec = createPlanRecursive(ctx, sortNode.getChild());
+ stack.push(sortNode);
+ leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack);
+ stack.pop();
return createSortPlan(ctx, sortNode, leftExec);
case JOIN:
JoinNode joinNode = (JoinNode) logicalNode;
- leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
+ stack.push(joinNode);
+ leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
+ rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
+ stack.pop();
return createJoinPlan(ctx, joinNode, leftExec, rightExec);
case UNION:
UnionNode unionNode = (UnionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
+ stack.push(unionNode);
+ leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack);
+ rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack);
+ stack.pop();
return new UnionExec(ctx, leftExec, rightExec);
case LIMIT:
LimitNode limitNode = (LimitNode) logicalNode;
- leftExec = createPlanRecursive(ctx, limitNode.getChild());
+ stack.push(limitNode);
+ leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack);
+ stack.pop();
return new LimitExec(ctx, limitNode.getInSchema(),
limitNode.getOutSchema(), leftExec, limitNode);
@@ -745,41 +771,65 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec);
}
- public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
- Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
- "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
-
+ private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) {
Enforcer enforcer = ctx.getEnforcer();
+ List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
+ if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
+ SortNode sortNode = (SortNode) node.peek();
+ TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
+
+ boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
+ SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList());
+ return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
+ } else {
+ return false;
+ }
+ }
- // check if this table is broadcasted one or not.
- boolean broadcastFlag = false;
- if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
- List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
- for (EnforceProperty property : properties) {
- broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+ throws IOException {
+ Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
+ "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
+
+ // check if an input is sorted in the same order to the subsequence sort operator.
+ // TODO - it works only if input files are raw files. We should check the file format.
+ // Since the default intermediate file format is raw file, it is not problem right now.
+ if (checkIfSortEquivalance(ctx, scanNode, node)) {
+ FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+ return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments);
+ } else {
+ Enforcer enforcer = ctx.getEnforcer();
+
+ // check if this table is broadcasted one or not.
+ boolean broadcastFlag = false;
+ if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+ List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+ for (EnforceProperty property : properties) {
+ broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+ }
}
- }
- if (scanNode instanceof PartitionedTableScanNode
- && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
- ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+ if (scanNode instanceof PartitionedTableScanNode
+ && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+ ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
- if (scanNode instanceof PartitionedTableScanNode) {
- if (broadcastFlag) {
- PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
- List<FileFragment> fileFragments = TUtil.newList();
- for (Path path : partitionedTableScanNode.getInputPaths()) {
- fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
- }
+ if (scanNode instanceof PartitionedTableScanNode) {
+ if (broadcastFlag) {
+ PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+ List<FileFragment> fileFragments = TUtil.newList();
+ for (Path path : partitionedTableScanNode.getInputPaths()) {
+ fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+ }
- return new PartitionMergeScanExec(ctx, sm, scanNode,
- FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+ return new PartitionMergeScanExec(ctx, sm, scanNode,
+ FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+ }
}
}
- }
- FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
- return new SeqScanExec(ctx, sm, scanNode, fragments);
+ FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+ return new SeqScanExec(ctx, sm, scanNode, fragments);
+ }
}
public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -858,6 +908,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
PhysicalExec child) throws IOException {
+
+ // check if it is a distributed merge sort
+ // If so, it does need to create a sort executor because
+ // the sort executor is created at the scan planning
+ if (child instanceof SortExec) {
+ SortExec childSortExec = (SortExec) child;
+ if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) {
+ return child;
+ }
+ }
+
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
if (property != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index b59cdda..9ada2ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -30,6 +30,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
@@ -749,4 +750,13 @@ public class PlannerUtil {
}
return names;
}
+
+ public static SortSpec [] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
+ SortSpec [] sortSpecs = new SortSpec[sortSpecProtos.size()];
+ int i = 0;
+ for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+ sortSpecs[i++] = new SortSpec(proto);
+ }
+ return sortSpecs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 1843b5a..fe93f46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -58,11 +58,11 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
@Override
public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
- checkRelationExistence(state, expr.getName());
+ assertRelationExistence(state, expr.getName());
return expr;
}
- private boolean checkRelationExistence(VerificationState state, String name) {
+ private boolean assertRelationExistence(VerificationState state, String name) {
if (!catalog.existsTable(name)) {
state.addVerification(String.format("relation \"%s\" does not exist", name));
return false;
@@ -70,15 +70,34 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
return true;
}
+ private boolean assertRelationNoExistence(VerificationState state, String name) {
+ if (catalog.existsTable(name)) {
+ state.addVerification(String.format("relation \"%s\" already exists", name));
+ return false;
+ }
+ return true;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Data Definition Language Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public Expr visitCreateTable(VerificationState state, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+ super.visitCreateTable(state, stack, expr);
+ assertRelationNoExistence(state, expr.getTableName());
+ return expr;
+ }
+
///////////////////////////////////////////////////////////////////////////////////////////////////////////
// Insert or Update Section
///////////////////////////////////////////////////////////////////////////////////////////////////////////
- public Expr visitInsert(VerificationState context, Stack<Expr> stack, Insert expr) throws PlanningException {
- Expr child = super.visitInsert(context, stack, expr);
+ public Expr visitInsert(VerificationState state, Stack<Expr> stack, Insert expr) throws PlanningException {
+ Expr child = super.visitInsert(state, stack, expr);
if (expr.hasTableName()) {
- checkRelationExistence(context, expr.getTableName());
+ assertRelationExistence(state, expr.getTableName());
}
if (child != null && child.getType() == OpType.Projection) {
@@ -88,9 +107,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
int targetColumnNum = expr.getTargetColumns().length;
if (targetColumnNum > projectColumnNum) {
- context.addVerification("INSERT has more target columns than expressions");
+ state.addVerification("INSERT has more target columns than expressions");
} else if (targetColumnNum < projectColumnNum) {
- context.addVerification("INSERT has more expressions than target columns");
+ state.addVerification("INSERT has more expressions than target columns");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index fc56b55..5bff857 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -19,7 +19,7 @@
package org.apache.tajo.engine.planner;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.storage.Tuple;
@@ -28,7 +28,7 @@ import org.apache.tajo.storage.TupleRange;
import java.math.BigDecimal;
public abstract class RangePartitionAlgorithm {
- protected Schema schema;
+ protected SortSpec [] sortSpecs;
protected TupleRange range;
protected final BigDecimal totalCard;
/** true if the end of the range is inclusive. Otherwise, it should be false. */
@@ -36,15 +36,15 @@ public abstract class RangePartitionAlgorithm {
/**
*
- * @param schema the schema of the range tuples
- * @param range range to be partition
+ * @param sortSpecs The array of sort keys
+ * @param totalRange The total range to be partition
* @param inclusive true if the end of the range is inclusive. Otherwise, false.
*/
- public RangePartitionAlgorithm(Schema schema, TupleRange range, boolean inclusive) {
- this.schema = schema;
- this.range = range;
+ public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
+ this.sortSpecs = sortSpecs;
+ this.range = totalRange;
this.inclusive = inclusive;
- this.totalCard = computeCardinalityForAllColumns(schema, range, inclusive);
+ this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
}
/**
@@ -56,7 +56,7 @@ public abstract class RangePartitionAlgorithm {
* @return
*/
public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
- boolean inclusive) {
+ boolean inclusive, boolean isAscending) {
BigDecimal columnCard;
switch (dataType.getType()) {
@@ -64,35 +64,75 @@ public abstract class RangePartitionAlgorithm {
columnCard = new BigDecimal(2);
break;
case CHAR:
- columnCard = new BigDecimal(end.asChar() - start.asChar());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asChar() - start.asChar());
+ } else {
+ columnCard = new BigDecimal(start.asChar() - end.asChar());
+ }
break;
case BIT:
- columnCard = new BigDecimal(end.asByte() - start.asByte());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asByte() - start.asByte());
+ } else {
+ columnCard = new BigDecimal(start.asByte() - end.asByte());
+ }
break;
case INT2:
- columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+ } else {
+ columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+ }
break;
case INT4:
- columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
break;
case INT8:
- columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
break;
case FLOAT4:
- columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
break;
case FLOAT8:
- columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
break;
case TEXT:
- columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+ } else {
+ columnCard = new BigDecimal(start.asChars().charAt(0) - end.asChars().charAt(0));
+ }
break;
case DATE:
- columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
break;
case TIME:
case TIMESTAMP:
- columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
break;
default:
throw new UnsupportedOperationException(dataType + " is not supported yet");
@@ -105,16 +145,16 @@ public abstract class RangePartitionAlgorithm {
* It computes the value cardinality of a tuple range.
* @return
*/
- public static BigDecimal computeCardinalityForAllColumns(Schema schema, TupleRange range, boolean inclusive) {
+ public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
Tuple start = range.getStart();
Tuple end = range.getEnd();
Column col;
BigDecimal cardinality = new BigDecimal(1);
BigDecimal columnCard;
- for (int i = 0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- columnCard = computeCardinality(col.getDataType(), start.get(i), end.get(i), inclusive);
+ for (int i = 0; i < sortSpecs.length; i++) {
+ columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+ sortSpecs[i].isAscending());
if (new BigDecimal(0).compareTo(columnCard) < 0) {
cardinality = cardinality.multiply(columnCard);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 4f18c95..948b19e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.exception.RangeOverflowException;
@@ -33,6 +33,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
+
public class UniformRangePartition extends RangePartitionAlgorithm {
private int variableId;
private BigDecimal[] cardForEachDigit;
@@ -40,16 +41,16 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
/**
*
- * @param schema
- * @param range
+ * @param totalRange
+ * @param sortSpecs The description of sort keys
* @param inclusive true if the end of the range is inclusive
*/
- public UniformRangePartition(Schema schema, TupleRange range, boolean inclusive) {
- super(schema, range, inclusive);
- colCards = new BigDecimal[schema.getColumnNum()];
- for (int i = 0; i < schema.getColumnNum(); i++) {
- colCards[i] = computeCardinality(schema.getColumn(i).getDataType(), range.getStart().get(i),
- range.getEnd().get(i), inclusive);
+ public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+ super(sortSpecs, totalRange, inclusive);
+ colCards = new BigDecimal[sortSpecs.length];
+ for (int i = 0; i < sortSpecs.length; i++) {
+ colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
+ totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
}
cardForEachDigit = new BigDecimal[colCards.length];
@@ -62,15 +63,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
}
}
- public UniformRangePartition(Schema schema, TupleRange range) {
- this(schema, range, true);
+ public UniformRangePartition(TupleRange range, SortSpec [] sortSpecs) {
+ this(range, sortSpecs, true);
}
@Override
public TupleRange[] partition(int partNum) {
Preconditions.checkArgument(partNum > 0,
"The number of partitions must be positive, but the given number: "
- + partNum);
+ + partNum);
Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
"the number of partition cannot exceed total cardinality (" + totalCard + ")");
@@ -97,10 +98,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
Tuple last = range.getStart();
while(reminder.compareTo(new BigDecimal(0)) > 0) {
if (reminder.compareTo(term) <= 0) { // final one is inclusive
- ranges.add(new TupleRange(schema, last, range.getEnd()));
+ ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
} else {
Tuple next = increment(last, term.longValue(), variableId);
- ranges.add(new TupleRange(schema, last, next));
+ ranges.add(new TupleRange(sortSpecs, last, next));
}
last = ranges.get(ranges.size() - 1).getEnd();
reminder = reminder.subtract(term);
@@ -109,49 +110,103 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
return ranges.toArray(new TupleRange[ranges.size()]);
}
- public boolean isOverflow(int colId, Datum last, BigDecimal inc) {
- Column column = schema.getColumn(colId);
+ /**
+ * Check whether an overflow occurs or not.
+ *
+ * @param colId The column id to be checked
+ * @param last
+ * @param inc
+ * @param sortSpecs
+ * @return
+ */
+ public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+ Column column = sortSpecs[colId].getSortKey();
BigDecimal candidate;
boolean overflow = false;
+
switch (column.getDataType().getType()) {
case BIT: {
- candidate = inc.add(new BigDecimal(last.asByte()));
- return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asByte()));
+ return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asByte()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+ }
}
case CHAR: {
- candidate = inc.add(new BigDecimal((int)last.asChar()));
- return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal((int)last.asChar()));
+ return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal((int)last.asChar()).subtract(inc);
+ return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+ }
}
case INT2: {
- candidate = inc.add(new BigDecimal(last.asInt2()));
- return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt2()));
+ return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt2()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+ }
}
+ case DATE:
case INT4: {
- candidate = inc.add(new BigDecimal(last.asInt4()));
- return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt4()));
+ return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt4()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+ }
}
+ case TIME:
+ case TIMESTAMP:
case INT8: {
- candidate = inc.add(new BigDecimal(last.asInt8()));
- return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt8()));
+ return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt8()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+ }
}
case FLOAT4: {
- candidate = inc.add(new BigDecimal(last.asFloat4()));
- return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asFloat4()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asFloat4()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+ }
}
case FLOAT8: {
- candidate = inc.add(new BigDecimal(last.asFloat8()));
- return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asFloat8()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asFloat8()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+ }
+
}
case TEXT: {
- candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
- return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
+ return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+ }
}
}
return overflow;
}
public long incrementAndGetReminder(int colId, Datum last, long inc) {
- Column column = schema.getColumn(colId);
+ Column column = sortSpecs[colId].getSortKey();
long reminder = 0;
switch (column.getDataType().getType()) {
case BIT: {
@@ -166,12 +221,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
reminder = candidate - end;
break;
}
+ case DATE:
case INT4: {
int candidate = (int) (last.asInt4() + inc);
int end = range.getEnd().get(colId).asInt4();
reminder = candidate - end;
break;
}
+ case TIME:
+ case TIMESTAMP:
case INT8: {
long candidate = last.asInt8() + inc;
long end = range.getEnd().get(colId).asInt8();
@@ -231,7 +289,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
int finalId = baseDigit;
incs[finalId] = value;
for (int i = finalId; i >= 0; i--) {
- if (isOverflow(i, last.get(i), incs[i])) {
+ if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
if (i == 0) {
throw new RangeOverflowException(range, last, incs[i].longValue());
}
@@ -253,10 +311,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
}
}
- Tuple end = new VTuple(schema.getColumnNum());
+ Tuple end = new VTuple(sortSpecs.length);
Column column;
for (int i = 0; i < last.size(); i++) {
- column = schema.getColumn(i);
+ column = sortSpecs[i].getSortKey();
switch (column.getDataType().getType()) {
case CHAR:
if (overflowFlag[i]) {
@@ -286,13 +344,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
end.put(i, DatumFactory.createInt4(
(int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
} else {
- end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+ if (sortSpecs[i].isAscending()) {
+ end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+ }
}
break;
case INT8:
if (overflowFlag[i]) {
end.put(i, DatumFactory.createInt8(
- range.getStart().get(i).asInt4() + incs[i].longValue()));
+ range.getStart().get(i).asInt8() + incs[i].longValue()));
} else {
end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
}
@@ -322,6 +384,28 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
}
break;
+ case DATE:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+ }
+ break;
+ case TIME:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
+ case TIMESTAMP:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createTimeStampFromMillis(
+ range.getStart().get(i).asInt8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createTimeStampFromMillis(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
default:
throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 2aa93d7..d040d7e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -469,6 +469,7 @@ public class GlobalPlanner {
currentNode.setChild(secondScan);
currentNode.setInSchema(secondScan.getOutSchema());
currentBlock.setPlan(currentNode);
+ currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
}
} else {
LogicalNode childBlockPlan = childBlock.getPlan();
@@ -487,6 +488,7 @@ public class GlobalPlanner {
currentNode.setChild(secondScan);
currentNode.setInSchema(secondScan.getOutSchema());
currentBlock.setPlan(currentNode);
+ currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
masterPlan.addConnect(channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index a3b37fc..f0ac290 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -27,11 +27,14 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -76,8 +79,10 @@ public class ExternalSortExec extends SortExec {
private final LocalDirAllocator localDirAllocator;
/** local file system */
private final RawLocalFileSystem localFS;
- /** final output files */
+ /** final output files which are used for cleaning */
private List<Path> finalOutputFiles = null;
+ /** for directly merging sorted inputs */
+ private List<Path> mergedInputPaths = null;
///////////////////////////////////////////////////
// transient variables
@@ -89,10 +94,10 @@ public class ExternalSortExec extends SortExec {
/** the final result */
private Scanner result;
- public ExternalSortExec(final TaskAttemptContext context,
- final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
- throws IOException {
- super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
+ private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+ throws PhysicalPlanningException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
+
this.plan = plan;
this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
@@ -111,6 +116,25 @@ public class ExternalSortExec extends SortExec {
localFS = new RawLocalFileSystem();
}
+ public ExternalSortExec(final TaskAttemptContext context,
+ final AbstractStorageManager sm, final SortNode plan,
+ final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
+ this(context, sm, plan);
+
+ mergedInputPaths = TUtil.newList();
+ for (CatalogProtos.FragmentProto proto : fragments) {
+ FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ mergedInputPaths.add(fragment.getPath());
+ }
+ }
+
+ public ExternalSortExec(final TaskAttemptContext context,
+ final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+ throws IOException {
+ this(context, sm, plan);
+ setChild(child);
+ }
+
public void init() throws IOException {
super.init();
}
@@ -120,9 +144,9 @@ public class ExternalSortExec extends SortExec {
}
/**
- * Sort tuple block and store them into a chunk file
+ * Sort a tuple block and store them into a chunk file
*/
- private final Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+ private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
throws IOException {
TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
int rowNum = tupleBlock.size();
@@ -156,7 +180,7 @@ public class ExternalSortExec extends SortExec {
* @return All paths of chunks
* @throws java.io.IOException
*/
- private final List<Path> sortAndStoreAllChunks() throws IOException {
+ private List<Path> sortAndStoreAllChunks() throws IOException {
Tuple tuple;
int memoryConsumption = 0;
List<Path> chunkPaths = TUtil.newList();
@@ -212,22 +236,31 @@ public class ExternalSortExec extends SortExec {
if (!sorted) { // if not sorted, first sort all data
- // Try to sort all data, and store them into a number of chunks if memory exceeds
- long startTimeOfChunkSplit = System.currentTimeMillis();
- List<Path> chunks = sortAndStoreAllChunks();
- long endTimeOfChunkSplit = System.currentTimeMillis();
- info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
-
- if (memoryResident) { // if all sorted data reside in a main-memory table.
- this.result = new MemTableScanner();
- } else { // if input data exceeds main-memory at least once
-
+ // if input files are given, it starts merging directly.
+ if (mergedInputPaths != null) {
try {
- this.result = externalMergeAndSort(chunks);
- } catch (Exception exception) {
- throw new PhysicalPlanningException(exception);
+ this.result = externalMergeAndSort(mergedInputPaths);
+ } catch (Exception e) {
+ throw new PhysicalPlanningException(e);
}
+ } else {
+ // Try to sort all data, and store them as multiple chunks if memory exceeds
+ long startTimeOfChunkSplit = System.currentTimeMillis();
+ List<Path> chunks = sortAndStoreAllChunks();
+ long endTimeOfChunkSplit = System.currentTimeMillis();
+ info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+ if (memoryResident) { // if all sorted data reside in a main-memory table.
+ this.result = new MemTableScanner();
+ } else { // if input data exceeds main-memory at least once
+
+ try {
+ this.result = externalMergeAndSort(chunks);
+ } catch (Exception e) {
+ throw new PhysicalPlanningException(e);
+ }
+ }
}
sorted = true;
@@ -511,7 +544,7 @@ public class ExternalSortExec extends SortExec {
outTuple = rightTuple;
rightTuple = rightScan.next();
}
- return outTuple;
+ return new VTuple(outTuple);
}
if (leftTuple == null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index a8dd877..f31455f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -44,14 +44,20 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
}
public void init() throws IOException {
- child.init();
+ if (child != null) {
+ child.init();
+ }
}
public void rescan() throws IOException {
- child.rescan();
+ if (child != null) {
+ child.rescan();
+ }
}
public void close() throws IOException {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 42508a0..b96b65e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -22,26 +22,20 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.dataserver.HttpUtil;
import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Iterator;
@@ -49,279 +43,8 @@ import java.util.List;
import java.util.Map;
public class TupleUtil {
- /** class logger **/
- private static final Log LOG = LogFactory.getLog(TupleUtil.class);
- /**
- * It computes the value cardinality of a tuple range.
- *
- * @param schema
- * @param range
- * @return
- */
- public static long computeCardinality(Schema schema, TupleRange range) {
- Tuple start = range.getStart();
- Tuple end = range.getEnd();
- Column col;
-
- long cardinality = 1;
- long columnCard;
- for (int i = 0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case CHAR:
- columnCard = end.get(i).asChar() - start.get(i).asChar();
- break;
- case BIT:
- columnCard = end.get(i).asByte() - start.get(i).asByte();
- break;
- case INT2:
- columnCard = end.get(i).asInt2() - start.get(i).asInt2();
- break;
- case INT4:
- columnCard = end.get(i).asInt4() - start.get(i).asInt4();
- break;
- case INT8:
- columnCard = end.get(i).asInt8() - start.get(i).asInt8();
- break;
- case FLOAT4:
- columnCard = end.get(i).asInt4() - start.get(i).asInt4();
- break;
- case FLOAT8:
- columnCard = end.get(i).asInt8() - start.get(i).asInt8();
- break;
- case TEXT:
- columnCard = end.get(i).asChars().charAt(0) - start.get(i).asChars().charAt(0);
- break;
- default:
- throw new UnsupportedOperationException(col.getDataType() + " is not supported yet");
- }
-
- if (columnCard > 0) {
- cardinality *= columnCard + 1;
- }
- }
-
- return cardinality;
- }
-
- public static TupleRange [] getPartitions(Schema schema, int partNum, TupleRange range) {
- Tuple start = range.getStart();
- Tuple end = range.getEnd();
- Column col;
- TupleRange [] partitioned = new TupleRange[partNum];
-
- Datum[] term = new Datum[schema.getColumnNum()];
- Datum[] prevValues = new Datum[schema.getColumnNum()];
-
- // initialize term and previous values
- for (int i = 0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- prevValues[i] = start.get(i);
- switch (col.getDataType().getType()) {
- case CHAR:
- int sChar = start.get(i).asChar();
- int eChar = end.get(i).asChar();
- int rangeChar;
- if ((eChar - sChar) > partNum) {
- rangeChar = (eChar - sChar) / partNum;
- } else {
- rangeChar = 1;
- }
- term[i] = DatumFactory.createInt4(rangeChar);
- case BIT:
- byte sByte = start.get(i).asByte();
- byte eByte = end.get(i).asByte();
- int rangeByte;
- if ((eByte - sByte) > partNum) {
- rangeByte = (eByte - sByte) / partNum;
- } else {
- rangeByte = 1;
- }
- term[i] = DatumFactory.createBit((byte) rangeByte);
- break;
-
- case INT2:
- short sShort = start.get(i).asInt2();
- short eShort = end.get(i).asInt2();
- int rangeShort;
- if ((eShort - sShort) > partNum) {
- rangeShort = (eShort - sShort) / partNum;
- } else {
- rangeShort = 1;
- }
- term[i] = DatumFactory.createInt2((short) rangeShort);
- break;
-
- case INT4:
- int sInt = start.get(i).asInt4();
- int eInt = end.get(i).asInt4();
- int rangeInt;
- if ((eInt - sInt) > partNum) {
- rangeInt = (eInt - sInt) / partNum;
- } else {
- rangeInt = 1;
- }
- term[i] = DatumFactory.createInt4(rangeInt);
- break;
-
- case INT8:
- long sLong = start.get(i).asInt8();
- long eLong = end.get(i).asInt8();
- long rangeLong;
- if ((eLong - sLong) > partNum) {
- rangeLong = ((eLong - sLong) / partNum);
- } else {
- rangeLong = 1;
- }
- term[i] = DatumFactory.createInt8(rangeLong);
- break;
-
- case FLOAT4:
- float sFloat = start.get(i).asFloat4();
- float eFloat = end.get(i).asFloat4();
- float rangeFloat;
- if ((eFloat - sFloat) > partNum) {
- rangeFloat = ((eFloat - sFloat) / partNum);
- } else {
- rangeFloat = 1;
- }
- term[i] = DatumFactory.createFloat4(rangeFloat);
- break;
- case FLOAT8:
- double sDouble = start.get(i).asFloat8();
- double eDouble = end.get(i).asFloat8();
- double rangeDouble;
- if ((eDouble - sDouble) > partNum) {
- rangeDouble = ((eDouble - sDouble) / partNum);
- } else {
- rangeDouble = 1;
- }
- term[i] = DatumFactory.createFloat8(rangeDouble);
- break;
- case TEXT:
- char sChars = start.get(i).asChars().charAt(0);
- char eChars = end.get(i).asChars().charAt(0);
- int rangeString;
- if ((eChars - sChars) > partNum) {
- rangeString = ((eChars - sChars) / partNum);
- } else {
- rangeString = 1;
- }
- term[i] = DatumFactory.createText(((char) rangeString) + "");
- break;
- case INET4:
- throw new UnsupportedOperationException();
- case BLOB:
- throw new UnsupportedOperationException();
- default:
- throw new UnsupportedOperationException();
- }
- }
-
- for (int p = 0; p < partNum; p++) {
- Tuple sTuple = new VTuple(schema.getColumnNum());
- Tuple eTuple = new VTuple(schema.getColumnNum());
- for (int i = 0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- sTuple.put(i, prevValues[i]);
- switch (col.getDataType().getType()) {
- case CHAR:
- char endChar = (char) (prevValues[i].asChar() + term[i].asChar());
- if (endChar > end.get(i).asByte()) {
- eTuple.put(i, end.get(i));
- } else {
- eTuple.put(i, DatumFactory.createChar(endChar));
- }
- prevValues[i] = DatumFactory.createChar(endChar);
- break;
- case BIT:
- byte endByte = (byte) (prevValues[i].asByte() + term[i].asByte());
- if (endByte > end.get(i).asByte()) {
- eTuple.put(i, end.get(i));
- } else {
- eTuple.put(i, DatumFactory.createBit(endByte));
- }
- prevValues[i] = DatumFactory.createBit(endByte);
- break;
- case INT2:
- int endShort = (short) (prevValues[i].asInt2() + term[i].asInt2());
- if (endShort > end.get(i).asInt2()) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createInt2((short) endShort));
- }
- prevValues[i] = DatumFactory.createInt2((short) endShort);
- break;
- case INT4:
- int endInt = (prevValues[i].asInt4() + term[i].asInt4());
- if (endInt > end.get(i).asInt4()) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createInt4(endInt));
- }
- prevValues[i] = DatumFactory.createInt4(endInt);
- break;
-
- case INT8:
- long endLong = (prevValues[i].asInt8() + term[i].asInt8());
- if (endLong > end.get(i).asInt8()) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createInt8(endLong));
- }
- prevValues[i] = DatumFactory.createInt8(endLong);
- break;
-
- case FLOAT4:
- float endFloat = (prevValues[i].asFloat4() + term[i].asFloat4());
- if (endFloat > end.get(i).asFloat4()) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createFloat4(endFloat));
- }
- prevValues[i] = DatumFactory.createFloat4(endFloat);
- break;
- case FLOAT8:
- double endDouble = (prevValues[i].asFloat8() + term[i].asFloat8());
- if (endDouble > end.get(i).asFloat8()) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createFloat8(endDouble));
- }
- prevValues[i] = DatumFactory.createFloat8(endDouble);
- break;
- case TEXT:
- String endString = ((char)(prevValues[i].asChars().charAt(0) + term[i].asChars().charAt(0))) + "";
- if (endString.charAt(0) > end.get(i).asChars().charAt(0)) {
- eTuple.put(i, end.get(i));
- } else {
- // TODO - to consider overflow
- eTuple.put(i, DatumFactory.createText(endString));
- }
- prevValues[i] = DatumFactory.createText(endString);
- break;
- case INET4:
- throw new UnsupportedOperationException();
- case BLOB:
- throw new UnsupportedOperationException();
- default:
- throw new UnsupportedOperationException();
- }
- }
- partitioned[p] = new TupleRange(schema, sTuple, eTuple);
- }
-
- return partitioned;
- }
-
- public static String rangeToQuery(Schema schema, TupleRange range,
- boolean ascendingFirstKey, boolean last)
+ public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder
@@ -345,32 +68,8 @@ public class TupleUtil {
return sb.toString();
}
- public static String [] rangesToQueries(final SortSpec[] sortSpec,
- final TupleRange[] ranges)
- throws UnsupportedEncodingException {
- Schema schema = PlannerUtil.sortSpecsToSchema(sortSpec);
- boolean ascendingFirstKey = sortSpec[0].isAscending();
- String [] params = new String[ranges.length];
- for (int i = 0; i < ranges.length; i++) {
- params[i] =
- rangeToQuery(schema, ranges[i], ascendingFirstKey,
- ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
- }
- return params;
- }
-
- public static TupleRange queryToRange(Schema schema, String query) throws UnsupportedEncodingException {
- Map<String,String> params = HttpUtil.getParamsFromQuery(query);
- String startUrlDecoded = URLDecoder.decode(params.get("start"), "utf-8");
- String endUrlDecoded = URLDecoder.decode(params.get("end"), "utf-8");
- byte [] startBytes = Base64.decodeBase64(startUrlDecoded);
- byte [] endBytes = Base64.decodeBase64(endUrlDecoded);
- return new TupleRange(schema, RowStoreUtil.RowStoreDecoder
- .toTuple(schema, startBytes), RowStoreUtil.RowStoreDecoder
- .toTuple(schema, endBytes));
- }
+ public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) {
- public static TupleRange columnStatToRange(Schema schema, Schema target, List<ColumnStats> colStats) {
Map<Column, ColumnStats> statSet = Maps.newHashMap();
for (ColumnStats stat : colStats) {
statSet.put(stat.getColumn(), stat);
@@ -385,11 +84,16 @@ public class TupleUtil {
Tuple endTuple = new VTuple(target.getColumnNum());
int i = 0;
for (Column col : target.getColumns()) {
- startTuple.put(i, statSet.get(col).getMinValue());
- endTuple.put(i, statSet.get(col).getMaxValue());
+ if (sortSpecs[i].isAscending()) {
+ startTuple.put(i, statSet.get(col).getMinValue());
+ endTuple.put(i, statSet.get(col).getMaxValue());
+ } else {
+ startTuple.put(i, statSet.get(col).getMaxValue());
+ endTuple.put(i, statSet.get(col).getMinValue());
+ }
i++;
}
- return new TupleRange(target, startTuple, endTuple);
+ return new TupleRange(sortSpecs, startTuple, endTuple);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 71ab8f9..8ea824e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -308,8 +308,8 @@ public class Repartitioner {
// calculate the number of maximum query ranges
TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
- TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, totalStat.getColumnStats());
- RangePartitionAlgorithm partitioner = new UniformRangePartition(sortSchema, mergedRange);
+ TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+ RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
BigDecimal card = partitioner.getTotalCardinality();
// if the number of the range cardinality is less than the desired number of tasks,
@@ -355,8 +355,8 @@ public class Repartitioner {
for (int i = 0; i < ranges.length; i++) {
uris = new HashSet<URI>();
for (String uri: basicFetchURIs) {
- String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
- ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+ String rangeParam =
+ TupleUtil.rangeToQuery(sortSchema, ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
URI finalUri = URI.create(uri + "&" + rangeParam);
uris.add(finalUri);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index 26991a0..a54fa80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -29,7 +29,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.worker.dataserver.retriever.FileChunk;
import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -113,7 +112,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
startOffset = idxReader.find(start);
} catch (IOException ioe) {
LOG.error("State Dump (the requested range: "
- + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ idxReader.getLastKey());
throw ioe;
}
@@ -124,7 +123,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
}
} catch (IOException ioe) {
LOG.error("State Dump (the requested range: "
- + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ idxReader.getLastKey());
throw ioe;
}
@@ -136,7 +135,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
startOffset = idxReader.find(start, true);
} catch (IOException ioe) {
LOG.error("State Dump (the requested range: "
- + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ idxReader.getLastKey());
throw ioe;
}
@@ -145,7 +144,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
if (startOffset == -1) {
throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
"State Dump (the requested range: "
- + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ idxReader.getLastKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 1b72d13..8a0b63a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -166,7 +166,7 @@ public class Task {
this.shuffleType = context.getDataChannel().getShuffleType();
if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
- SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
+ SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index acd663f..3d5cdf2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner;
+import org.apache.tajo.catalog.SortSpec;
import org.junit.Test;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -39,6 +40,9 @@ public class TestUniformRangePartition {
Schema schema = new Schema()
.addColumn("l_returnflag", Type.TEXT)
.addColumn("l_linestatus", Type.TEXT);
+
+ SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(2);
s.put(0, DatumFactory.createText("A"));
s.put(1, DatumFactory.createText("A"));
@@ -46,11 +50,10 @@ public class TestUniformRangePartition {
e.put(0, DatumFactory.createText("D"));
e.put(1, DatumFactory.createText("C"));
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner =
- new UniformRangePartition(schema, expected);
- assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+ assertEquals(12, partitioner.getTotalCardinality().intValue());
String [] result = new String[12];
result[0] = "AA";
@@ -84,6 +87,9 @@ public class TestUniformRangePartition {
Schema schema = new Schema()
.addColumn("l_returnflag", Type.TEXT)
.addColumn("l_linestatus", Type.TEXT);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(2);
s.put(0, DatumFactory.createText("A"));
s.put(1, DatumFactory.createText("A"));
@@ -91,11 +97,10 @@ public class TestUniformRangePartition {
e.put(0, DatumFactory.createText("D"));
e.put(1, DatumFactory.createText("C"));
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner =
- new UniformRangePartition(schema, expected);
- assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+ assertEquals(12, partitioner.getTotalCardinality().intValue());
String [] result = new String[12];
result[0] = "AA";
@@ -129,6 +134,8 @@ public class TestUniformRangePartition {
.addColumn("l_linestatus", Type.TEXT)
.addColumn("final", Type.TEXT);
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(3);
s.put(0, DatumFactory.createText("A"));
s.put(1, DatumFactory.createText("A"));
@@ -138,11 +145,10 @@ public class TestUniformRangePartition {
e.put(1, DatumFactory.createText("B")); // 2
e.put(2, DatumFactory.createText("C")); // x3 = 24
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner =
- new UniformRangePartition(schema, expected);
- assertEquals(24, TupleUtil.computeCardinality(schema, expected));
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+ assertEquals(24, partitioner.getTotalCardinality().intValue());
Tuple overflowBefore = partitioner.increment(s, 5, 2);
assertEquals("A", overflowBefore.get(0).asChars());
@@ -159,6 +165,9 @@ public class TestUniformRangePartition {
Schema schema = new Schema()
.addColumn("l_orderkey", Type.INT8)
.addColumn("l_linenumber", Type.INT8);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(2);
s.put(0, DatumFactory.createInt8(10));
s.put(1, DatumFactory.createInt8(20));
@@ -166,10 +175,9 @@ public class TestUniformRangePartition {
e.put(0, DatumFactory.createInt8(19));
e.put(1, DatumFactory.createInt8(39));
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner =
- new UniformRangePartition(schema, expected);
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
assertEquals(200, partitioner.getTotalCardinality().longValue());
Tuple range2 = partitioner.increment(s, 100, 1);
@@ -185,6 +193,9 @@ public class TestUniformRangePartition {
.addColumn("l_orderkey", Type.INT8)
.addColumn("l_linenumber", Type.INT8)
.addColumn("final", Type.INT8);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(3);
s.put(0, DatumFactory.createInt8(1));
s.put(1, DatumFactory.createInt8(1));
@@ -194,10 +205,9 @@ public class TestUniformRangePartition {
e.put(1, DatumFactory.createInt8(2)); // 2
e.put(2, DatumFactory.createInt8(3)); //x3 = 24
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner
- = new UniformRangePartition(schema, expected);
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
assertEquals(24, partitioner.getTotalCardinality().longValue());
Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -216,6 +226,9 @@ public class TestUniformRangePartition {
.addColumn("l_orderkey", Type.FLOAT8)
.addColumn("l_linenumber", Type.FLOAT8)
.addColumn("final", Type.FLOAT8);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(3);
s.put(0, DatumFactory.createFloat8(1.1d));
s.put(1, DatumFactory.createFloat8(1.1d));
@@ -225,10 +238,9 @@ public class TestUniformRangePartition {
e.put(1, DatumFactory.createFloat8(2.1d)); // 2
e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
- UniformRangePartition partitioner =
- new UniformRangePartition(schema, expected);
+ UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
assertEquals(24, partitioner.getTotalCardinality().longValue());
Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -246,15 +258,18 @@ public class TestUniformRangePartition {
Schema schema = new Schema();
schema.addColumn("l_returnflag", Type.TEXT);
schema.addColumn("l_linestatus", Type.TEXT);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(2);
s.put(0, DatumFactory.createText("A"));
s.put(1, DatumFactory.createText("F"));
Tuple e = new VTuple(2);
e.put(0, DatumFactory.createText("R"));
e.put(1, DatumFactory.createText("O"));
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
RangePartitionAlgorithm partitioner
- = new UniformRangePartition(schema, expected, true);
+ = new UniformRangePartition(expected, sortSpecs, true);
TupleRange [] ranges = partitioner.partition(31);
@@ -273,15 +288,18 @@ public class TestUniformRangePartition {
Schema schema = new Schema()
.addColumn("l_returnflag", Type.TEXT)
.addColumn("l_linestatus", Type.TEXT);
+
+ SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
Tuple s = new VTuple(2);
s.put(0, DatumFactory.createText("A"));
s.put(1, DatumFactory.createText("F"));
Tuple e = new VTuple(2);
e.put(0, DatumFactory.createText("R"));
e.put(1, DatumFactory.createText("O"));
- TupleRange expected = new TupleRange(schema, s, e);
+ TupleRange expected = new TupleRange(sortSpecs, s, e);
RangePartitionAlgorithm partitioner =
- new UniformRangePartition(schema, expected, true);
+ new UniformRangePartition(expected, sortSpecs, true);
TupleRange [] ranges = partitioner.partition(1);
assertEquals(expected, ranges[0]);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index cf24054..97932e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -51,6 +51,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
+import java.util.Stack;
import static org.junit.Assert.assertEquals;
@@ -191,7 +192,7 @@ public class TestBSTIndexExec {
}
@Override
- public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
throws IOException {
Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
"Error: There is no table matched to %s", scanNode.getTableName());