You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/25 07:35:02 UTC
[2/3] TAJO-539: Change some EvalNode::eval to directly return a Datum
value.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 341a58a..3e756d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -31,9 +31,12 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
@@ -212,15 +215,12 @@ public class GlobalPlanner {
// setup current block
ExecutionBlock currentBlock = context.plan.newExecutionBlock();
LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
- for (Target target : groupbyNode.getTargets()) {
- List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
- for (AggregationFunctionCallEval function : functions) {
- if (function.isDistinct()) {
- columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
- } else {
- // See the comment of this method. the aggregation function should be executed as the first phase.
- function.setFirstPhase();
- }
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ if (function.isDistinct()) {
+ columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+ } else {
+ // See the comment of this method. the aggregation function should be executed as the first phase.
+ function.setFirstPhase();
}
}
@@ -247,65 +247,140 @@ public class GlobalPlanner {
return currentBlock;
}
- private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+ private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
GroupbyNode groupbyNode) throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
- if (groupbyNode.isDistinct()) {
- return buildDistinctGroupBy(context, childBlock, groupbyNode);
+ if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
+ return buildDistinctGroupBy(context, lastBlock, groupbyNode);
} else {
- GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
+ GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
- if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
- ((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+ if (hasUnionChild(firstPhaseGroupby)) {
+ currentBlock = buildGroupbyAndUnionPlan(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+ } else {
+ // general hash-shuffled aggregation
+ currentBlock = buildTwoPhaseGroupby(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+ }
+ }
- currentBlock = childBlock;
- for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
- if (firstPhaseGroupBy.isEmptyGrouping()) {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
- } else {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
- }
- dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ return currentBlock;
+ }
- ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
- GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan(), firstPhaseGroupBy);
- g1.setChild(subBlock.getPlan());
- subBlock.setPlan(g1);
-
- GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan(), groupbyNode);
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- g2.setChild(scanNode);
- currentBlock.setPlan(g2);
- }
- } else { // general hash-shuffled aggregation
- childBlock.setPlan(firstPhaseGroupBy);
- currentBlock = masterPlan.newExecutionBlock();
+ public boolean hasUnionChild(UnaryNode node) {
- DataChannel channel;
- if (firstPhaseGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
- channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
- } else {
- channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
- channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
- }
- channel.setSchema(firstPhaseGroupBy.getOutSchema());
- channel.setStoreType(storeType);
+ if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) {
+ TableSubQueryNode tableSubQuery = node.getChild();
+ return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+ }
+
+ return false;
+ }
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- groupbyNode.setChild(scanNode);
- groupbyNode.setInSchema(scanNode.getOutSchema());
- currentBlock.setPlan(groupbyNode);
- masterPlan.addConnect(channel);
+ private static ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+ GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+ DataChannel lastDataChannel = null;
+
+ // It pushes down the first phase group-by operator into all child blocks.
+ //
+ // (second phase) G (currentBlock)
+ // /|\
+ // / / | \
+ // (first phase) G G G G (child block)
+
+ // They are already connected one another.
+ // So, we don't need to connect them again.
+ for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+ if (firstPhaseGroupBy.isEmptyGrouping()) {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+ } else {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
}
+ dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+ // Why must firstPhaseGroupby be copied?
+ //
+ // A groupby in each execution block can have different child.
+ // It affects groupby's input schema.
+ GroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+ firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+ childBlock.setPlan(firstPhaseGroupbyCopy);
+
+ // just keep the last data channel.
+ lastDataChannel = dataChannel;
+ }
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+ secondPhaseGroupBy.setChild(scanNode);
+ lastBlock.setPlan(secondPhaseGroupBy);
+ return lastBlock;
+ }
+
+ private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
+ GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+ ExecutionBlock childBlock = latestBlock;
+ childBlock.setPlan(firstPhaseGroupby);
+ ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel channel;
+ if (firstPhaseGroupby.isEmptyGrouping()) {
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+ channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
+ } else {
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+ channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
}
+ channel.setSchema(firstPhaseGroupby.getOutSchema());
+ channel.setStoreType(storeType);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ secondPhaseGroupby.setChild(scanNode);
+ secondPhaseGroupby.setInSchema(scanNode.getOutSchema());
+ currentBlock.setPlan(secondPhaseGroupby);
+
+ masterPlan.addConnect(channel);
return currentBlock;
}
+ public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode groupBy) {
+ Preconditions.checkNotNull(groupBy);
+
+ GroupbyNode firstPhaseGroupBy = PlannerUtil.clone(plan, groupBy);
+ GroupbyNode secondPhaseGroupBy = groupBy;
+
+ // Set first phase expressions
+ if (secondPhaseGroupBy.hasAggFunctions()) {
+ int evalNum = secondPhaseGroupBy.getAggFunctions().length;
+ AggregationFunctionCallEval [] secondPhaseEvals = secondPhaseGroupBy.getAggFunctions();
+ AggregationFunctionCallEval [] firstPhaseEvals = new AggregationFunctionCallEval[evalNum];
+
+ String [] firstPhaseEvalNames = new String[evalNum];
+ for (int i = 0; i < evalNum; i++) {
+ try {
+ firstPhaseEvals[i] = (AggregationFunctionCallEval) secondPhaseEvals[i].clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+
+ firstPhaseEvals[i].setFirstPhase();
+ firstPhaseEvalNames[i] = plan.newGeneratedFieldName(firstPhaseEvals[i]);
+ FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+ secondPhaseEvals[i].setArgs(new EvalNode[] {param});
+ }
+
+ secondPhaseGroupBy.setAggFunctions(secondPhaseEvals);
+ firstPhaseGroupBy.setAggFunctions(firstPhaseEvals);
+ Target [] firstPhaseTargets = ProjectionPushDownRule.buildGroupByTarget(firstPhaseGroupBy, firstPhaseEvalNames);
+ firstPhaseGroupBy.setTargets(firstPhaseTargets);
+ secondPhaseGroupBy.setInSchema(PlannerUtil.targetToSchema(firstPhaseTargets));
+ }
+ return firstPhaseGroupBy;
+ }
+
private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 41f1e88..8acd32e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -20,14 +20,22 @@ package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.util.TUtil;
public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
- @Expose private Column [] columns;
- @Expose private Target [] targets;
+ /** Grouping key sets */
+ @Expose private Column [] groupingColumns;
+ /** Aggregation Functions */
+ @Expose private AggregationFunctionCallEval [] aggrFunctions;
+ /**
+ * It's a list of targets. The grouping columns should be followed by aggregation functions.
+ * aggrFunctions keep actual aggregation functions, but it only contains field references.
+ * */
+ @Expose private Target [] targets;
@Expose private boolean hasDistinct = false;
public GroupbyNode(int pid) {
@@ -35,15 +43,15 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
}
public final boolean isEmptyGrouping() {
- return columns == null || columns.length == 0;
+ return groupingColumns == null || groupingColumns.length == 0;
}
public void setGroupingColumns(Column [] groupingColumns) {
- this.columns = groupingColumns;
+ this.groupingColumns = groupingColumns;
}
public final Column [] getGroupingColumns() {
- return this.columns;
+ return this.groupingColumns;
}
public final boolean isDistinct() {
@@ -54,6 +62,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
hasDistinct = distinct;
}
+ public boolean hasAggFunctions() {
+ return this.aggrFunctions != null;
+ }
+
+ public AggregationFunctionCallEval [] getAggFunctions() {
+ return this.aggrFunctions;
+ }
+
+ public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+ this.aggrFunctions = evals;
+ }
+
@Override
public boolean hasTargets() {
return this.targets != null;
@@ -76,9 +96,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
public String toString() {
StringBuilder sb = new StringBuilder("\"GroupBy\": {\"grouping fields\":[");
- for (int i=0; i < columns.length; i++) {
- sb.append("\"").append(columns[i]).append("\"");
- if(i < columns.length - 1)
+ for (int i=0; i < groupingColumns.length; i++) {
+ sb.append("\"").append(groupingColumns[i]).append("\"");
+ if(i < groupingColumns.length - 1)
sb.append(",");
}
@@ -92,6 +112,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
}
sb.append("],");
}
+ if (aggrFunctions != null) {
+ sb.append("\n \"expr\": ").append(TUtil.arrayToString(aggrFunctions)).append(",");
+ }
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
sb.append("\n \"in schema\": ").append(getInSchema());
sb.append("}");
@@ -104,7 +127,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
if (obj instanceof GroupbyNode) {
GroupbyNode other = (GroupbyNode) obj;
boolean eq = super.equals(other);
- eq = eq && TUtil.checkEquals(columns, other.columns);
+ eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+ eq = eq && TUtil.checkEquals(aggrFunctions, other.aggrFunctions);
eq = eq && TUtil.checkEquals(targets, other.targets);
return eq;
} else {
@@ -115,10 +139,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
GroupbyNode grp = (GroupbyNode) super.clone();
- if (columns != null) {
- grp.columns = new Column[columns.length];
- for (int i = 0; i < columns.length; i++) {
- grp.columns[i] = (Column) columns[i].clone();
+ if (groupingColumns != null) {
+ grp.groupingColumns = new Column[groupingColumns.length];
+ for (int i = 0; i < groupingColumns.length; i++) {
+ grp.groupingColumns[i] = (Column) groupingColumns[i].clone();
+ }
+ }
+
+ if (aggrFunctions != null) {
+ grp.aggrFunctions = new AggregationFunctionCallEval[aggrFunctions.length];
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ grp.aggrFunctions[i] = (AggregationFunctionCallEval) aggrFunctions[i].clone();
}
}
@@ -138,7 +169,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
StringBuilder sb = new StringBuilder();
sb.append("(");
- Column [] groupingColumns = columns;
+ Column [] groupingColumns = this.groupingColumns;
for (int j = 0; j < groupingColumns.length; j++) {
sb.append(groupingColumns[j].getColumnName());
if(j < groupingColumns.length - 1) {
@@ -150,6 +181,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
planStr.appendTitle(sb.toString());
+ sb = new StringBuilder();
+ sb.append("(");
+ for (int j = 0; j < aggrFunctions.length; j++) {
+ sb.append(aggrFunctions[j]);
+ if(j < aggrFunctions.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
sb = new StringBuilder("target list: ");
for (int i = 0; i < targets.length; i++) {
sb.append(targets[i]);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index ed82cef..eb69703 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -18,31 +18,22 @@
package org.apache.tajo.engine.planner.physical;
-import com.google.common.collect.Sets;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.util.List;
-import java.util.Set;
public abstract class AggregationExec extends UnaryPhysicalExec {
protected GroupbyNode plan;
- protected Set<Column> nonNullGroupingFields;
- protected int keylist [];
- protected int measureList[];
- protected final EvalNode evals [];
- protected EvalContext evalContexts [];
+ protected final int groupingKeyNum;
+ protected int groupingKeyIds[];
+ protected final int aggFunctionsNum;
+ protected final AggregationFunctionCallEval aggFunctions[];
+
protected Schema evalSchema;
public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
@@ -52,47 +43,26 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
evalSchema = plan.getOutSchema();
- nonNullGroupingFields = Sets.newHashSet();
- // keylist will contain a list of IDs of grouping column
- keylist = new int[plan.getGroupingColumns().length];
+ final Column [] keyColumns = plan.getGroupingColumns();
+ groupingKeyNum = keyColumns.length;
+ groupingKeyIds = new int[groupingKeyNum];
Column col;
for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
- col = plan.getGroupingColumns()[idx];
- keylist[idx] = inSchema.getColumnId(col.getQualifiedName());
- nonNullGroupingFields.add(col);
- }
-
- // measureList will contain a list of measure field indexes against the target list.
- List<Integer> measureIndexes = TUtil.newList();
- for (int i = 0; i < plan.getTargets().length; i++) {
- Target target = plan.getTargets()[i];
- if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
- measureIndexes.add(i);
- }
- }
-
- measureList = new int[measureIndexes.size()];
- for (int i = 0; i < measureIndexes.size(); i++) {
- measureList[i] = measureIndexes.get(i);
+ col = keyColumns[idx];
+ groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
}
- evals = new EvalNode[plan.getTargets().length];
- evalContexts = new EvalContext[plan.getTargets().length];
- for (int i = 0; i < plan.getTargets().length; i++) {
- Target t = plan.getTargets()[i];
- if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
- evals[i] = new ConstEval(DatumFactory.createNullDatum());
- evalContexts[i] = evals[i].newContext();
- } else {
- evals[i] = t.getEvalTree();
- evalContexts[i] = evals[i].newContext();
- }
+ if (plan.hasAggFunctions()) {
+ aggFunctions = plan.getAggFunctions();
+ aggFunctionsNum = aggFunctions.length;
+ } else {
+ aggFunctions = new AggregationFunctionCallEval[0];
+ aggFunctionsNum = 0;
}
}
@Override
public void close() throws IOException {
super.close();
- nonNullGroupingFields.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index cf56200..b39a9f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,15 +18,14 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,7 +37,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
private final JoinNode plan;
private final boolean hasJoinQual;
private final EvalNode joinQual;
- private final EvalContext qualCtx;
private final List<Tuple> leftTupleSlots;
private final List<Tuple> rightTupleSlots;
@@ -58,7 +56,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -67,10 +64,8 @@ public class BNLJoinExec extends BinaryPhysicalExec {
this.joinQual = plan.getJoinQual();
if (joinQual != null) { // if join type is not 'cross join'
hasJoinQual = true;
- this.qualCtx = this.joinQual.newContext();
} else {
hasJoinQual = false;
- this.qualCtx = null;
}
this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
@@ -85,7 +80,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
}
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -191,15 +185,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
frameTuple.set(leftTuple, rightIterator.next());
if (hasJoinQual) {
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outputTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outputTuple);
return outputTuple;
}
} else {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outputTuple);
+ projector.eval(frameTuple, outputTuple);
return outputTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 30c1fa7..2ff6fc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,16 +19,15 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -37,11 +36,9 @@ public class BSTIndexScanExec extends PhysicalExec {
private SeekableScanner fileScanner;
private EvalNode qual;
- private EvalContext qualCtx;
private BSTIndex.BSTIndexReader reader;
private final Projector projector;
- private EvalContext [] evalContexts;
private Datum[] datum = null;
@@ -54,18 +51,12 @@ public class BSTIndexScanExec extends PhysicalExec {
super(context, scanNode.getInSchema(), scanNode.getOutSchema());
this.scanNode = scanNode;
this.qual = scanNode.getQual();
- if(this.qual == null) {
- this.qualCtx = null;
- } else {
- this.qualCtx = this.qual.newContext();
- }
this.datum = datum;
this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
- this.evalContexts = projector.newContexts();
this.reader = new BSTIndex(sm.getFileSystem().getConf()).
getIndexReader(fileName, keySchema, comparator);
@@ -109,18 +100,15 @@ public class BSTIndexScanExec extends PhysicalExec {
Tuple outTuple = new VTuple(this.outSchema.getColumnNum());
if (!scanNode.hasQual()) {
if ((tuple = fileScanner.next()) != null) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
return outTuple;
} else {
return null;
}
} else {
- while( reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
return outTuple;
} else {
fileScanner.seek(reader.next());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 4480747..83580f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,43 +18,32 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.EvalExprNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class EvalExprExec extends PhysicalExec {
private final EvalExprNode plan;
- private final EvalContext[] evalContexts;
public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
super(context, plan.getInSchema(), plan.getOutSchema());
this.plan = plan;
-
- evalContexts = new EvalContext[plan.getTargets().length];
- for (int i = 0; i < plan.getTargets().length; i++) {
- evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
- }
}
@Override
public void init() throws IOException {
}
- /* (non-Javadoc)
- * @see PhysicalExec#next()
- */
@Override
public Tuple next() throws IOException {
Target [] targets = plan.getTargets();
Tuple t = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
- targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);
- t.put(i, targets[i].getEvalTree().terminate(evalContexts[i]));
+ t.put(i, targets[i].getEvalTree().eval(inSchema, null));
}
return t;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index ed2f275..d533b82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,7 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -35,44 +35,38 @@ import java.util.Map.Entry;
*/
public class HashAggregateExec extends AggregationExec {
private Tuple tuple = null;
- private Map<Tuple, EvalContext[]> tupleSlots;
+ private Map<Tuple, FunctionContext[]> hashTable;
private boolean computed = false;
- private Iterator<Entry<Tuple, EvalContext []>> iterator = null;
+ private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
- /**
- * @throws java.io.IOException
- *
- */
- public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode annotation,
- PhysicalExec subOp) throws IOException {
- super(ctx, annotation, subOp);
- tupleSlots = new HashMap<Tuple, EvalContext[]>(10000);
- this.tuple = new VTuple(evalSchema.getColumnNum());
+ public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+ super(ctx, plan, subOp);
+ hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+ this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
}
private void compute() throws IOException {
Tuple tuple;
Tuple keyTuple;
- int targetLength = plan.getTargets().length;
while((tuple = child.next()) != null && !context.isStopped()) {
- keyTuple = new VTuple(keylist.length);
+ keyTuple = new VTuple(groupingKeyIds.length);
// build one key tuple
- for(int i = 0; i < keylist.length; i++) {
- keyTuple.put(i, tuple.get(keylist[i]));
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(groupingKeyIds[i]));
}
- if(tupleSlots.containsKey(keyTuple)) {
- EvalContext [] tmpTuple = tupleSlots.get(keyTuple);
- for(int i = 0; i < measureList.length; i++) {
- evals[measureList[i]].eval(tmpTuple[measureList[i]], inSchema, tuple);
+ if(hashTable.containsKey(keyTuple)) {
+ FunctionContext [] contexts = hashTable.get(keyTuple);
+ for(int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
} else { // if the key occurs firstly
- EvalContext evalCtx [] = new EvalContext[targetLength];
- for(int i = 0; i < targetLength; i++) {
- evalCtx[i] = evals[i].newContext();
- evals[i].eval(evalCtx[i], inSchema, tuple);
+ FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
- tupleSlots.put(keyTuple, evalCtx);
+ hashTable.put(keyTuple, contexts);
}
}
}
@@ -81,16 +75,23 @@ public class HashAggregateExec extends AggregationExec {
public Tuple next() throws IOException {
if(!computed) {
compute();
- iterator = tupleSlots.entrySet().iterator();
+ iterator = hashTable.entrySet().iterator();
computed = true;
}
- EvalContext [] ctx;
+ FunctionContext [] contexts;
if (iterator.hasNext()) {
- ctx = iterator.next().getValue();
- for (int i = 0; i < ctx.length; i++) {
- tuple.put(i, evals[i].terminate(ctx[i]));
+ Entry<Tuple, FunctionContext []> entry = iterator.next();
+ Tuple keyTuple = entry.getKey();
+ contexts = entry.getValue();
+
+ int tupleIdx = 0;
+ for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+ tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+ }
+ for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+ tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
}
return tuple;
@@ -101,12 +102,12 @@ public class HashAggregateExec extends AggregationExec {
@Override
public void rescan() throws IOException {
- iterator = tupleSlots.entrySet().iterator();
+ iterator = hashTable.entrySet().iterator();
}
@Override
public void close() throws IOException {
super.close();
- tupleSlots.clear();
+ hashTable.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index caea5d9..848e362 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,9 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
@@ -48,7 +47,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -60,7 +58,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -72,7 +69,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
plan.getOutSchema(), outer, inner);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
// this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
@@ -95,7 +91,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -156,8 +151,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
} else {
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
@@ -173,8 +167,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
shouldGetLeftTuple = true;
return outTuple;
@@ -184,10 +177,9 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
- if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
getKeyLeftTuple(leftTuple, leftKeyTuple);
matched.put(leftKeyTuple, true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index e08e07d..08b7035 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,9 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -29,6 +27,7 @@ import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
@@ -46,7 +45,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -58,7 +56,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
PhysicalExec rightExec) {
@@ -66,7 +63,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
leftExec, rightExec);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
@@ -85,7 +81,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -131,10 +126,8 @@ public class HashJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 6b2d7b8..d7437c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -80,8 +80,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
} else {
// if not found, it returns a tuple.
frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
@@ -91,16 +90,14 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
while (notFound && iterator.hasNext()) {
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
notFound = false;
}
}
if (notFound) { // if there is no matched tuple
frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 841ff5a..d0c9897 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,9 +18,9 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,13 +30,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// from logical plan
@@ -51,7 +49,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -63,7 +60,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -75,7 +71,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
plan.getOutSchema(), leftChild, rightChild);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
@@ -93,7 +88,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -137,8 +131,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
shouldGetLeftTuple = true;
return outTuple;
@@ -148,10 +141,8 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index c5e2d24..842c2e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -89,11 +89,9 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
while (notFound && iterator.hasNext()) {
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
notFound = false;
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index 41ff7bf..0418f65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -18,21 +18,15 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.HavingNode;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class HavingExec extends UnaryPhysicalExec {
private final EvalNode qual;
- private final EvalContext qualCtx;
- private final Tuple outputTuple;
public HavingExec(TaskAttemptContext context,
HavingNode plan,
@@ -40,17 +34,14 @@ public class HavingExec extends UnaryPhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.qual = plan.getQual();
- this.qualCtx = this.qual.newContext();
- this.outputTuple = new VTuple(outSchema.getColumnNum());
}
@Override
public Tuple next() throws IOException {
Tuple tuple;
while ((tuple = child.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
- return tuple;
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 446755d..1d6da3f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -41,7 +40,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -62,7 +60,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -78,7 +75,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -93,7 +89,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -148,8 +143,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
rightTuple = rightChild.next();
return outTuple;
@@ -159,8 +153,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
leftTuple = leftChild.next();
return outTuple;
@@ -206,8 +199,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// BEFORE RETURN, MOVE FORWARD
rightTuple = rightChild.next();
if(rightTuple == null) {
@@ -221,8 +213,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
// BEFORE RETURN, MOVE FORWARD
leftTuple = leftChild.next();
@@ -297,9 +288,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
posRightTupleSlots = posRightTupleSlots + 1;
frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
} else {
// right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
@@ -312,9 +302,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
posLeftTupleSlots = posLeftTupleSlots + 1;
frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e128fea..e1d377e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -63,7 +61,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -72,7 +69,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -89,7 +85,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -165,10 +160,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
}
frameTuple.set(outerNext, innerIterator.next());
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 055c66e..961be93 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -18,14 +18,13 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -41,10 +40,8 @@ public class NLJoinExec extends BinaryPhysicalExec {
private Tuple outerTuple = null;
private Tuple innerTuple = null;
private Tuple outTuple = null;
- private EvalContext qualCtx;
// projection
- private final EvalContext [] evalContexts;
private final Projector projector;
public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
@@ -54,12 +51,10 @@ public class NLJoinExec extends BinaryPhysicalExec {
if (plan.hasJoinQual()) {
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
}
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
needNewOuter = true;
@@ -90,15 +85,12 @@ public class NLJoinExec extends BinaryPhysicalExec {
frameTuple.set(outerTuple, innerTuple);
if (joinQual != null) {
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
} else {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 305cdd2..4abe570 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -18,8 +18,6 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
@@ -27,6 +25,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -41,10 +40,8 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
private Tuple leftTuple = null;
private Tuple rightTuple = null;
private Tuple outTuple = null;
- private EvalContext qualCtx;
// projection
- private final EvalContext [] evalContexts;
private final Projector projector;
private boolean foundAtLeastOneMatch;
@@ -57,12 +54,10 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
if (plan.hasJoinQual()) {
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
}
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
needNextRightTuple = true;
@@ -96,8 +91,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
foundAtLeastOneMatch = true;
needNextRightTuple = true;
@@ -111,10 +105,9 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
}
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).isTrue()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ ;
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
foundAtLeastOneMatch = true;
return outTuple;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index c57089e..ecc6dd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,12 +21,11 @@
*/
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.planner.logical.Projectable;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -35,7 +34,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
// for projection
private Tuple outTuple;
- private EvalContext[] evalContexts;
private Projector projector;
public ProjectionExec(TaskAttemptContext context, Projectable plan,
@@ -49,7 +47,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
this.outTuple = new VTuple(outSchema.getColumnNum());
this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
- this.evalContexts = projector.newContexts();
}
@Override
@@ -60,8 +57,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
return null;
}
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 2f1d33d..365faba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -19,10 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -61,7 +59,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -77,7 +74,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -91,7 +87,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -158,8 +153,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
rightTuple = rightChild.next();
@@ -225,8 +219,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded left tuple
Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
// BEFORE RETURN, MOVE FORWARD
@@ -305,9 +298,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
posRightTupleSlots = posRightTupleSlots + 1;
frameTuple.set(nextLeft, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
} else {
@@ -321,9 +313,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
posLeftTupleSlots = posLeftTupleSlots + 1;
frameTuple.set(nextLeft, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 5158dc0..2e676e9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.SelectionNode;
import org.apache.tajo.storage.Tuple;
@@ -28,23 +27,20 @@ import java.io.IOException;
public class SelectionExec extends UnaryPhysicalExec {
private final EvalNode qual;
- private final EvalContext qualCtx;
public SelectionExec(TaskAttemptContext context,
SelectionNode plan,
PhysicalExec child) {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.qual = plan.getQual();
- this.qualCtx = this.qual.newContext();
}
@Override
public Tuple next() throws IOException {
Tuple tuple;
while ((tuple = child.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).isTrue()) {
- return tuple;
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 6d3d6b1..c38be92 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,21 +18,24 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.HashSet;
@@ -46,12 +49,10 @@ public class SeqScanExec extends PhysicalExec {
private Scanner scanner = null;
private EvalNode qual = null;
- private EvalContext qualCtx;
private CatalogProtos.FragmentProto [] fragments;
private Projector projector;
- private EvalContext [] evalContexts;
public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
@@ -60,12 +61,6 @@ public class SeqScanExec extends PhysicalExec {
this.plan = plan;
this.qual = plan.getQual();
this.fragments = fragments;
-
- if (qual == null) {
- qualCtx = null;
- } else {
- qualCtx = this.qual.newContext();
- }
}
/**
@@ -100,9 +95,7 @@ public class SeqScanExec extends PhysicalExec {
// However, actual values absent in tuples. So, Replace all column references by constant datum.
for (Column column : columnPartitionSchema.toArray()) {
FieldEval targetExpr = new FieldEval(column);
- EvalContext evalContext = targetExpr.newContext();
- targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
- Datum datum = targetExpr.terminate(evalContext);
+ Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
ConstEval constExpr = new ConstEval(datum);
for (Target target : plan.getTargets()) {
if (target.getEvalTree().equals(targetExpr)) {
@@ -151,7 +144,6 @@ public class SeqScanExec extends PhysicalExec {
}
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
if (fragments.length > 1) {
this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
@@ -172,8 +164,7 @@ public class SeqScanExec extends PhysicalExec {
if (!plan.hasQual()) {
if ((tuple = scanner.next()) != null) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
outTuple.setOffset(tuple.getOffset());
return outTuple;
} else {
@@ -181,10 +172,9 @@ public class SeqScanExec extends PhysicalExec {
}
} else {
while ((tuple = scanner.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).isTrue()) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index af1bf34..dbe45dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,82 +18,108 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
/**
- * This is the sort-based Aggregation Operator.
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
*/
public class SortAggregateExec extends AggregationExec {
- private Tuple prevKey = null;
+ private Tuple lastKey = null;
private boolean finished = false;
+ private FunctionContext contexts[];
-
- public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan,
- PhysicalExec child) throws IOException {
+ public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
super(context, plan, child);
+ contexts = new FunctionContext[plan.getAggFunctions().length];
}
@Override
public Tuple next() throws IOException {
- Tuple curKey;
+ Tuple currentKey;
Tuple tuple;
- Tuple finalTuple = null;
+ Tuple outputTuple = null;
+
while(!context.isStopped() && (tuple = child.next()) != null) {
- // build a key tuple
- curKey = new VTuple(keylist.length);
- for(int i = 0; i < keylist.length; i++) {
- curKey.put(i, tuple.get(keylist[i]));
+
+ // get a key tuple
+ currentKey = new VTuple(groupingKeyIds.length);
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ currentKey.put(i, tuple.get(groupingKeyIds[i]));
}
- if (prevKey == null || prevKey.equals(curKey)) {
- if (prevKey == null) {
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- evalContexts[i] = evals[i].newContext();
- evals[i].eval(evalContexts[i], inSchema, tuple);
+ /** Aggregation State */
+ if (lastKey == null || lastKey.equals(currentKey)) {
+ if (lastKey == null) {
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
- prevKey = curKey;
+ lastKey = currentKey;
} else {
// aggregate
- for (int idx : measureList) {
- evals[idx].eval(evalContexts[idx], inSchema, tuple);
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
}
- } else {
+
+ } else { /** Finalization State */
// finalize aggregate and return
- finalTuple = new VTuple(outSchema.getColumnNum());
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+ outputTuple = new VTuple(outSchema.getColumnNum());
+ int tupleIdx = 0;
+
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
}
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- evalContexts[i] = evals[i].newContext();
- evals[i].eval(evalContexts[i], inSchema, tuple);
+ for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+ contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+ aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
}
- prevKey = curKey;
- return finalTuple;
+
+ lastKey = currentKey;
+ return outputTuple;
}
} // while loop
if (!finished) {
- finalTuple = new VTuple(outSchema.getColumnNum());
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+ outputTuple = new VTuple(outSchema.getColumnNum());
+ int tupleIdx = 0;
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
}
finished = true;
}
- return finalTuple;
+ return outputTuple;
}
@Override
public void rescan() throws IOException {
super.rescan();
- prevKey = null;
+ lastKey = null;
finished = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 72ff67c..041220a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -95,16 +95,13 @@ public class PartitionedTableRewriter implements RewriteRule {
}
private static class PartitionPathFilter implements PathFilter {
- private FileSystem fs;
private Schema schema;
private EvalNode partitionFilter;
- private EvalContext evalContext;
public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
this.schema = schema;
this.partitionFilter = partitionFilter;
- evalContext = partitionFilter.newContext();
}
@Override
@@ -113,8 +110,8 @@ public class PartitionedTableRewriter implements RewriteRule {
if (tuple == null) { // if it is a file or not acceptable file
return false;
}
- partitionFilter.eval(evalContext, schema, tuple);
- return partitionFilter.terminate(evalContext).asBool();
+
+ return partitionFilter.eval(schema, tuple).asBool();
}
@Override