You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/17 10:23:34 UTC
[04/12] TAJO-501: Rewrite the projection part of logical planning.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 74ef38a..9d66dd5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -24,45 +24,92 @@ import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.NamedExprsManager;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
import org.apache.tajo.engine.planner.logical.JoinNode;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
+
+ private String [] guessRelationsFromJoinQual(LogicalPlan.QueryBlock block, EvalNode joinCondition)
+ throws PlanningException {
+
+ // Note that we can guarantee that each join qual used here is a singleton.
+ // This is because we use dissect a join qual into conjunctive normal forms.
+ // In other words, each join qual has a form 'col1 = col2'.
+ Column leftExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getLeftExpr()).get(0);
+ Column rightExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getRightExpr()).get(0);
+
+ // 0 - left table, 1 - right table
+ String [] relationNames = new String[2];
+
+ NamedExprsManager namedExprsMgr = block.getNamedExprsManager();
+ if (leftExpr.hasQualifier()) {
+ relationNames[0] = leftExpr.getQualifier();
+ } else {
+ if (namedExprsMgr.isAliasedName(leftExpr.getColumnName())) {
+ String columnName = namedExprsMgr.getOriginalName(leftExpr.getColumnName());
+ String [] parts = columnName.split("\\.");
+
+ if (parts.length != 2) {
+ throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+ }
+
+ relationNames[0] = parts[0];
+ } else {
+ throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+ }
+ }
+
+ if (rightExpr.hasQualifier()) {
+ relationNames[1] = rightExpr.getQualifier();
+ } else {
+ if (namedExprsMgr.isAliasedName(rightExpr.getColumnName())) {
+ String columnName = namedExprsMgr.getOriginalName(rightExpr.getColumnName());
+ String [] parts = columnName.split("\\.");
+ if (parts.length != 2) {
+ throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+ }
+ relationNames[1] = parts[0];
+ } else {
+ throw new PlanningException("Cannot expect a referenced relation: " + rightExpr);
+ }
+ }
+
+ return relationNames;
+ }
public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
JoinNode joinNode) throws PlanningException {
Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
Set<EvalNode> nonJoinQuals = Sets.newHashSet();
for (EvalNode singleQual : cnf) {
if (PlannerUtil.isJoinQual(singleQual)) {
- List<Column> leftExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getLeftExpr());
- List<Column> rightExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getRightExpr());
- String leftExprRelation = leftExpr.get(0).getQualifier();
- String rightExprRelName = rightExpr.get(0).getQualifier();
+ String [] relations = guessRelationsFromJoinQual(block, singleQual);
+ String leftExprRelName = relations[0];
+ String rightExprRelName = relations[1];
Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
- boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelation);
+ boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
JoinEdge edge;
- edge = getEdge(leftExprRelation, rightExprRelName);
+ edge = getEdge(leftExprRelName, rightExprRelName);
if (edge != null) {
edge.addJoinQual(singleQual);
} else {
if (isLeftExprForLeftTable) {
edge = new JoinEdge(joinNode.getJoinType(),
- block.getRelation(leftExprRelation), block.getRelation(rightExprRelName), singleQual);
- addEdge(leftExprRelation, rightExprRelName, edge);
+ block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
+ addEdge(leftExprRelName, rightExprRelName, edge);
} else {
edge = new JoinEdge(joinNode.getJoinType(),
- block.getRelation(rightExprRelName), block.getRelation(leftExprRelation), singleQual);
- addEdge(rightExprRelName, leftExprRelation, edge);
+ block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
+ addEdge(rightExprRelName, leftExprRelName, edge);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index e0f5f4c..ed82cef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -19,7 +19,6 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.collect.Sets;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.DatumFactory;
@@ -29,8 +28,11 @@ import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalType;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
public abstract class AggregationExec extends UnaryPhysicalExec {
@@ -43,24 +45,12 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
protected EvalContext evalContexts [];
protected Schema evalSchema;
- protected EvalNode havingQual;
- protected EvalContext havingContext;
-
public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
- if (plan.hasHavingCondition()) {
- this.havingQual = plan.getHavingCondition();
- this.havingContext = plan.getHavingCondition().newContext();
- }
-
- if (plan.getHavingSchema() != null) {
- this.evalSchema = plan.getHavingSchema();
- } else {
- this.evalSchema = plan.getOutSchema();
- }
+ evalSchema = plan.getOutSchema();
nonNullGroupingFields = Sets.newHashSet();
// keylist will contain a list of IDs of grouping column
@@ -72,27 +62,25 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
nonNullGroupingFields.add(col);
}
- // measureList will contain a list of IDs of measure fields
- int valueIdx = 0;
- measureList = new int[plan.getTargets().length - keylist.length];
- if (measureList.length > 0) {
- search: for (int inputIdx = 0; inputIdx < plan.getTargets().length; inputIdx++) {
- for (int key : keylist) { // eliminate key field
- if (plan.getTargets()[inputIdx].getColumnSchema().getColumnName()
- .equals(inSchema.getColumn(key).getColumnName())) {
- continue search;
- }
- }
- measureList[valueIdx] = inputIdx;
- valueIdx++;
+ // measureList will contain a list of measure field indexes against the target list.
+ List<Integer> measureIndexes = TUtil.newList();
+ for (int i = 0; i < plan.getTargets().length; i++) {
+ Target target = plan.getTargets()[i];
+ if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
+ measureIndexes.add(i);
}
}
+ measureList = new int[measureIndexes.size()];
+ for (int i = 0; i < measureIndexes.size(); i++) {
+ measureList[i] = measureIndexes.get(i);
+ }
+
evals = new EvalNode[plan.getTargets().length];
evalContexts = new EvalContext[plan.getTargets().length];
for (int i = 0; i < plan.getTargets().length; i++) {
Target t = plan.getTargets()[i];
- if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getColumnSchema())) {
+ if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
evals[i] = new ConstEval(DatumFactory.createNullDatum());
evalContexts[i] = evals[i].newContext();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index b2737b7..cf56200 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,13 +18,13 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -35,46 +35,57 @@ import java.util.List;
public class BNLJoinExec extends BinaryPhysicalExec {
// from logical plan
- private JoinNode plan;
- private EvalNode joinQual;
- private EvalContext qualCtx;
+ private final JoinNode plan;
+ private final boolean hasJoinQual;
+ private final EvalNode joinQual;
+ private final EvalContext qualCtx;
- private final List<Tuple> outerTupleSlots;
- private final List<Tuple> innerTupleSlots;
- private Iterator<Tuple> outerIterator;
- private Iterator<Tuple> innerIterator;
+ private final List<Tuple> leftTupleSlots;
+ private final List<Tuple> rightTupleSlots;
+ private Iterator<Tuple> leftIterator;
+ private Iterator<Tuple> rightIterator;
- private boolean innerEnd;
- private boolean outerEnd;
+ private boolean leftEnd;
+ private boolean rightEnd;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
- private Tuple outerTuple = null;
+ private Tuple leftTuple = null;
private Tuple outputTuple = null;
- private Tuple innext = null;
+ private Tuple rightNext = null;
private final int TUPLE_SLOT_SIZE = 10000;
// projection
- private final int[] targetIds;
+ private final Projector projector;
+ private final EvalContext [] evalContexts;
public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
- final PhysicalExec outer, PhysicalExec inner) {
- super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner);
+ final PhysicalExec leftExec, PhysicalExec rightExec) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftExec, rightExec);
this.plan = plan;
this.joinQual = plan.getJoinQual();
if (joinQual != null) { // if join type is not 'cross join'
+ hasJoinQual = true;
this.qualCtx = this.joinQual.newContext();
+ } else {
+ hasJoinQual = false;
+ this.qualCtx = null;
}
- this.outerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
- this.innerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
- this.outerIterator = outerTupleSlots.iterator();
- this.innerIterator = innerTupleSlots.iterator();
- this.innerEnd = false;
- this.outerEnd = false;
+ this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+ this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+ this.leftIterator = leftTupleSlots.iterator();
+ this.rightIterator = rightTupleSlots.iterator();
+ this.rightEnd = false;
+ this.leftEnd = false;
// for projection
- targetIds = RowStoreUtil.getTargetIds(inSchema, outSchema);
+ if (!plan.hasTargets()) {
+ plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
+ }
+
+ projector = new Projector(inSchema, outSchema, plan.getTargets());
+ evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -87,106 +98,108 @@ public class BNLJoinExec extends BinaryPhysicalExec {
public Tuple next() throws IOException {
- if (outerTupleSlots.isEmpty()) {
+ if (leftTupleSlots.isEmpty()) {
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
Tuple t = leftChild.next();
if (t == null) {
- outerEnd = true;
+ leftEnd = true;
break;
}
- outerTupleSlots.add(t);
+ leftTupleSlots.add(t);
}
- outerIterator = outerTupleSlots.iterator();
- outerTuple = outerIterator.next();
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
}
- if (innerTupleSlots.isEmpty()) {
+ if (rightTupleSlots.isEmpty()) {
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
Tuple t = rightChild.next();
if (t == null) {
- innerEnd = true;
+ rightEnd = true;
break;
}
- innerTupleSlots.add(t);
+ rightTupleSlots.add(t);
}
- innerIterator = innerTupleSlots.iterator();
+ rightIterator = rightTupleSlots.iterator();
}
- if((innext = rightChild.next()) == null){
- innerEnd = true;
+ if((rightNext = rightChild.next()) == null){
+ rightEnd = true;
}
while (true) {
- if (!innerIterator.hasNext()) { // if inneriterator ended
- if (outerIterator.hasNext()) { // if outertupleslot remains
- outerTuple = outerIterator.next();
- innerIterator = innerTupleSlots.iterator();
+ if (!rightIterator.hasNext()) { // if leftIterator ended
+ if (leftIterator.hasNext()) { // if rightTupleslot remains
+ leftTuple = leftIterator.next();
+ rightIterator = rightTupleSlots.iterator();
} else {
- if (innerEnd) {
+ if (rightEnd) {
rightChild.rescan();
- innerEnd = false;
+ rightEnd = false;
- if (outerEnd) {
+ if (leftEnd) {
return null;
}
- outerTupleSlots.clear();
+ leftTupleSlots.clear();
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
Tuple t = leftChild.next();
if (t == null) {
- outerEnd = true;
+ leftEnd = true;
break;
}
- outerTupleSlots.add(t);
+ leftTupleSlots.add(t);
}
- if (outerTupleSlots.isEmpty()) {
+ if (leftTupleSlots.isEmpty()) {
return null;
}
- outerIterator = outerTupleSlots.iterator();
- outerTuple = outerIterator.next();
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
} else {
- outerIterator = outerTupleSlots.iterator();
- outerTuple = outerIterator.next();
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
}
- innerTupleSlots.clear();
- if (innext != null) {
- innerTupleSlots.add(innext);
- for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+ rightTupleSlots.clear();
+ if (rightNext != null) {
+ rightTupleSlots.add(rightNext);
+ for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right
Tuple t = rightChild.next();
if (t == null) {
- innerEnd = true;
+ rightEnd = true;
break;
}
- innerTupleSlots.add(t);
+ rightTupleSlots.add(t);
}
} else {
- for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+ for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right
Tuple t = rightChild.next();
if (t == null) {
- innerEnd = true;
+ rightEnd = true;
break;
}
- innerTupleSlots.add(t);
+ rightTupleSlots.add(t);
}
}
- if ((innext = rightChild.next()) == null) {
- innerEnd = true;
+ if ((rightNext = rightChild.next()) == null) {
+ rightEnd = true;
}
- innerIterator = innerTupleSlots.iterator();
+ rightIterator = rightTupleSlots.iterator();
}
}
- frameTuple.set(outerTuple, innerIterator.next());
- if (joinQual != null) {
+ frameTuple.set(leftTuple, rightIterator.next());
+ if (hasJoinQual) {
joinQual.eval(qualCtx, inSchema, frameTuple);
if (joinQual.terminate(qualCtx).asBool()) {
- RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outputTuple);
return outputTuple;
}
} else {
- RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outputTuple);
return outputTuple;
}
}
@@ -195,10 +208,10 @@ public class BNLJoinExec extends BinaryPhysicalExec {
@Override
public void rescan() throws IOException {
super.rescan();
- innerEnd = false;
- innerTupleSlots.clear();
- outerTupleSlots.clear();
- innerIterator = innerTupleSlots.iterator();
- outerIterator = outerTupleSlots.iterator();
+ rightEnd = false;
+ rightTupleSlots.clear();
+ leftTupleSlots.clear();
+ rightIterator = rightTupleSlots.iterator();
+ leftIterator = leftTupleSlots.iterator();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 5de6634..30c1fa7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -65,7 +65,7 @@ public class BSTIndexScanExec extends PhysicalExec {
scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
this.reader = new BSTIndex(sm.getFileSystem().getConf()).
getIndexReader(fileName, keySchema, comparator);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index 4723ecc..67d6baa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -22,37 +22,72 @@ import java.util.Stack;
public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
- @Override
- public RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visit(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
throws PhysicalPlanningException {
- if (exec instanceof SeqScanExec) {
- return visitSeqScan((SeqScanExec) exec, stack, context);
- } else if (exec instanceof SelectionExec) {
- return visitSelection((SelectionExec) exec, stack, context);
- } else if (exec instanceof SortExec) {
- return visitSort((SortExec) exec, stack, context);
- } else if (exec instanceof SortAggregateExec) {
- return visitSortAggregation((SortAggregateExec) exec, stack, context);
- } else if (exec instanceof ProjectionExec) {
- return visitProjection((ProjectionExec) exec, stack, context);
+ // Please keep all physical executors except for abstract class.
+ // They should be ordered in an lexicography order of their names for easy code maintenance.
+ if (exec instanceof BNLJoinExec) {
+ return visitBNLJoin(context, (BNLJoinExec) exec, stack);
+ } else if (exec instanceof BSTIndexScanExec) {
+ return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack);
+ } else if (exec instanceof ColumnPartitionedTableStoreExec) {
+ return visitColumnPartitionedTableStore(context, (ColumnPartitionedTableStoreExec) exec, stack);
+ } else if (exec instanceof EvalExprExec) {
+ return visitEvalExpr(context, (EvalExprExec) exec, stack);
+ } else if (exec instanceof ExternalSortExec) {
+ return visitExternalSort(context, (ExternalSortExec) exec, stack);
+ } else if (exec instanceof HashAggregateExec) {
+ return visitHashAggregate(context, (HashAggregateExec) exec, stack);
+ } else if (exec instanceof HashFullOuterJoinExec) {
+ return visitHashFullOuterJoin(context, (HashFullOuterJoinExec) exec, stack);
} else if (exec instanceof HashJoinExec) {
- return visitHashJoin((HashJoinExec) exec, stack, context);
+ return visitHashJoin(context, (HashJoinExec) exec, stack);
} else if (exec instanceof HashLeftAntiJoinExec) {
- return visitHashAntiJoin((HashLeftAntiJoinExec) exec, stack, context);
+ return visitHashLeftAntiJoin(context, (HashLeftAntiJoinExec) exec, stack);
+ } else if (exec instanceof HashLeftOuterJoinExec) {
+ return visitHashLeftOuterJoin(context, (HashLeftOuterJoinExec) exec, stack);
} else if (exec instanceof HashLeftSemiJoinExec) {
- return visitHashSemiJoin((HashLeftSemiJoinExec) exec, stack, context);
+ return visitLeftHashSemiJoin(context, (HashLeftSemiJoinExec) exec, stack);
+ } else if (exec instanceof HashShuffleFileWriteExec) {
+ return visitHashShuffleFileWrite(context, (HashShuffleFileWriteExec) exec, stack);
+ } else if (exec instanceof HavingExec) {
+ return visitHaving(context, (HavingExec) exec, stack);
} else if (exec instanceof LimitExec) {
- return visitLimit((LimitExec) exec, stack, context);
- } else {
- throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+ return visitLimit(context, (LimitExec) exec, stack);
+ } else if (exec instanceof MemSortExec) {
+ return visitMemSort(context, (MemSortExec) exec, stack);
+ } else if (exec instanceof MergeFullOuterJoinExec) {
+ return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack);
+ } else if (exec instanceof MergeJoinExec) {
+ return visitMergeJoin(context, (MergeJoinExec) exec, stack);
+ } else if (exec instanceof NLJoinExec) {
+ return visitNLJoin(context, (NLJoinExec) exec, stack);
+ } else if (exec instanceof NLLeftOuterJoinExec) {
+ return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
+ } else if (exec instanceof ProjectionExec) {
+ return visitProjection(context, (ProjectionExec) exec, stack);
+ } else if (exec instanceof RangeShuffleFileWriteExec) {
+ return visitRangeShuffleFileWrite(context, (RangeShuffleFileWriteExec) exec, stack);
+ } else if (exec instanceof RightOuterMergeJoinExec) {
+ return visitRightOuterMergeJoin(context, (RightOuterMergeJoinExec) exec, stack);
+ } else if (exec instanceof SelectionExec) {
+ return visitSelection(context, (SelectionExec) exec, stack);
+ } else if (exec instanceof SeqScanExec) {
+ return visitSeqScan(context, (SeqScanExec) exec, stack);
+ } else if (exec instanceof SortAggregateExec) {
+ return visitSortAggregate(context, (SortAggregateExec) exec, stack);
+ } else if (exec instanceof StoreTableExec) {
+ return visitStoreTable(context, (StoreTableExec) exec, stack);
}
+
+ throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
}
private RESULT visitUnaryExecutor(UnaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
throws PhysicalPlanningException {
stack.push(exec);
- RESULT r = visitChild(exec.getChild(), stack, context);
+ RESULT r = visit(exec.getChild(), stack, context);
stack.pop();
return r;
}
@@ -60,68 +95,162 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
private RESULT visitBinaryExecutor(BinaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
throws PhysicalPlanningException {
stack.push(exec);
- RESULT r = visitChild(exec.getLeftChild(), stack, context);
- visitChild(exec.getRightChild(), stack, context);
+ RESULT r = visit(exec.getLeftChild(), stack, context);
+ visit(exec.getRightChild(), stack, context);
stack.pop();
return r;
}
@Override
- public RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
- return visitUnaryExecutor(exec, stack, context);
+ return null;
+ }
+
+ @Override
+ public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
}
@Override
- public RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context) {
+ public RESULT visitColumnPartitionedTableStore(CONTEXT context, ColumnPartitionedTableStoreExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
return null;
}
@Override
- public RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
- return visitUnaryExecutor(exec, stack, context);
+ return null;
}
@Override
- public RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
- return visitBinaryExecutor(exec, stack, context);
+ return null;
}
@Override
- public RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
- return visitUnaryExecutor(exec, stack, context);
+ return null;
}
@Override
- public RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
- return visitUnaryExecutor(exec, stack, context);
+ return null;
}
@Override
- public RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
return visitBinaryExecutor(exec, stack, context);
}
@Override
- public RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
return visitBinaryExecutor(exec, stack, context);
}
@Override
- public RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack) throws
+ PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
return visitBinaryExecutor(exec, stack, context);
}
@Override
- public RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws
+ PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
return visitUnaryExecutor(exec, stack, context);
}
+
+ @Override
+ public RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack) {
+ return null;
+ }
+
+ @Override
+ public RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index c1f84a3..4480747 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -35,9 +35,9 @@ public class EvalExprExec extends PhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema());
this.plan = plan;
- evalContexts = new EvalContext[plan.getExprs().length];
- for (int i = 0; i < plan.getExprs().length; i++) {
- evalContexts[i] = plan.getExprs()[i].getEvalTree().newContext();
+ evalContexts = new EvalContext[plan.getTargets().length];
+ for (int i = 0; i < plan.getTargets().length; i++) {
+ evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
}
}
@@ -50,7 +50,7 @@ public class EvalExprExec extends PhysicalExec {
*/
@Override
public Tuple next() throws IOException {
- Target [] targets = plan.getExprs();
+ Target [] targets = plan.getTargets();
Tuple t = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 681e340..ed2f275 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,11 +18,11 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.HashMap;
@@ -86,29 +86,15 @@ public class HashAggregateExec extends AggregationExec {
}
EvalContext [] ctx;
- if (havingQual == null) {
- if (iterator.hasNext()) {
- ctx = iterator.next().getValue();
+ if (iterator.hasNext()) {
+ ctx = iterator.next().getValue();
for (int i = 0; i < ctx.length; i++) {
tuple.put(i, evals[i].terminate(ctx[i]));
}
return tuple;
- } else {
- return null;
- }
} else {
- while(iterator.hasNext()) {
- ctx = iterator.next().getValue();
- for (int i = 0; i < ctx.length; i++) {
- tuple.put(i, evals[i].terminate(ctx[i]));
- }
- havingQual.eval(havingContext, evalSchema, tuple);
- if (havingQual.terminate(havingContext).asBool()) {
- return tuple;
- }
- }
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 550f0ee..caea5d9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -95,7 +95,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 8a42cff..e08e07d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -60,32 +60,32 @@ public class HashJoinExec extends BinaryPhysicalExec {
protected final Projector projector;
protected final EvalContext [] evalContexts;
- public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
- PhysicalExec inner) {
- super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
- plan.getOutSchema(), outer, inner);
+ public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
+ PhysicalExec rightExec) {
+ super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
+ leftExec, rightExec);
this.plan = plan;
this.joinQual = plan.getJoinQual();
this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
- outer.getSchema(), inner.getSchema());
+ leftExec.getSchema(), rightExec.getSchema());
leftKeyList = new int[joinKeyPairs.size()];
rightKeyList = new int[joinKeyPairs.size()];
for (int i = 0; i < joinKeyPairs.size(); i++) {
- leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
}
for (int i = 0; i < joinKeyPairs.size(); i++) {
- rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
}
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index f0022cb..841ff5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -93,7 +93,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
new file mode 100644
index 0000000..41ff7bf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.HavingNode;
+import org.apache.tajo.engine.planner.logical.SelectionNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class HavingExec extends UnaryPhysicalExec {
+ private final EvalNode qual;
+ private final EvalContext qualCtx;
+ private final Tuple outputTuple;
+
+ public HavingExec(TaskAttemptContext context,
+ HavingNode plan,
+ PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+
+ this.qual = plan.getQual();
+ this.qualCtx = this.qual.newContext();
+ this.outputTuple = new VTuple(outSchema.getColumnNum());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ while ((tuple = child.next()) != null) {
+ qual.eval(qualCtx, inSchema, tuple);
+ if (qual.terminate(qualCtx).asBool()) {
+ return tuple;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index e1fe440..446755d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -93,7 +93,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index ebddd07..e128fea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -89,7 +89,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 67908da..055c66e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -59,7 +59,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.renew();
+ evalContexts = projector.newContexts();
// for join
needNewOuter = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index ebc78e2..305cdd2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -62,7 +62,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.renew();
+ evalContexts = projector.newContexts();
// for join
needNextRightTuple = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index f0abef5..9ede15d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -21,15 +21,82 @@ package org.apache.tajo.engine.planner.physical;
import java.util.Stack;
public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
- RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context);
- RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
- RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+
+ RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitColumnPartitionedTableStore(CONTEXT context, ColumnPartitionedTableStoreExec exec,
+ Stack<PhysicalExec> stack) throws PhysicalPlanningException;
+
+ RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index bd773ed..fdd1839 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -23,17 +23,16 @@ import java.util.Stack;
public class PhysicalPlanUtil {
public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
throws PhysicalPlanningException {
- return (T) new FindVisitor().visitChild(plan, new Stack<PhysicalExec>(), clazz);
+ return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
}
private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
- public PhysicalExec visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+ public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
throws PhysicalPlanningException {
-
if (target.isAssignableFrom(exec.getClass())) {
return exec;
} else {
- return super.visitChild(exec, stack, target);
+ return super.visit(exec, stack, target);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index f8ec801..c57089e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,24 +21,24 @@
*/
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.planner.logical.Projectable;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.ProjectionNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import java.io.IOException;
public class ProjectionExec extends UnaryPhysicalExec {
- private final ProjectionNode plan;
+ private final Projectable plan;
// for projection
private Tuple outTuple;
private EvalContext[] evalContexts;
private Projector projector;
- public ProjectionExec(TaskAttemptContext context, ProjectionNode plan,
+ public ProjectionExec(TaskAttemptContext context, Projectable plan,
PhysicalExec child) {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
@@ -49,7 +49,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
this.outTuple = new VTuple(outSchema.getColumnNum());
this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 8c99215..2f1d33d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -91,7 +91,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 0e6640c..5158dc0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,36 +18,24 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class SelectionExec extends UnaryPhysicalExec {
private final EvalNode qual;
private final EvalContext qualCtx;
- private final Tuple outputTuple;
- // projection
- private int [] targetIds;
public SelectionExec(TaskAttemptContext context,
SelectionNode plan,
PhysicalExec child) {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
-
this.qual = plan.getQual();
this.qualCtx = this.qual.newContext();
- // for projection
- if (!inSchema.equals(outSchema)) {
- targetIds = RowStoreUtil.getTargetIds(inSchema, outSchema);
- }
-
- this.outputTuple = new VTuple(outSchema.getColumnNum());
}
@Override
@@ -55,13 +43,8 @@ public class SelectionExec extends UnaryPhysicalExec {
Tuple tuple;
while ((tuple = child.next()) != null) {
qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
- if (targetIds != null) {
- RowStoreUtil.project(tuple, outputTuple, targetIds);
- return outputTuple;
- } else {
+ if (qual.terminate(qualCtx).isTrue()) {
return tuple;
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 3de75a9..6d3d6b1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -151,7 +151,7 @@ public class SeqScanExec extends PhysicalExec {
}
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
+ this.evalContexts = projector.newContexts();
if (fragments.length > 1) {
this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 44af891..ddcdd03 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -43,7 +43,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
@Override
public boolean isEligible(LogicalPlan plan) {
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
- if (block.hasSelectionNode() || block.hasJoinNode()) {
+ if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 4a7a324..72ff67c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.rewrite;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -359,7 +359,8 @@ public class PartitionedTableRewriter implements RewriteRule {
try {
Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
- PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+ PartitionedTableScanNode rewrittenScanNode =
+ new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
updateTableStat(rewrittenScanNode);
PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
} catch (IOException e) {