You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/25 07:35:01 UTC
[1/3] TAJO-539: Change some EvalNode::eval to directly return a Datum
value.
Updated Branches:
refs/heads/master 0386268e8 -> e23e78ccd
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 6264961..e70e6c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -23,10 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.SchemaUtil;
@@ -412,12 +409,19 @@ public class ProjectionPushDownRule extends
}
}
- // Getting target names
- final int targetNum = node.getTargets().length;
- final String [] targetNames = new String[targetNum];
- for (int i = 0; i < targetNum; i++) {
- Target target = node.getTargets()[i];
- targetNames[i] = newContext.addExpr(target);
+ // Getting eval names
+
+ final String [] aggEvalNames;
+ if (node.hasAggFunctions()) {
+ final int evalNum = node.getAggFunctions().length;
+ aggEvalNames = new String[evalNum];
+ for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) {
+ Target target = node.getTargets()[targetIdx];
+ EvalNode evalNode = node.getAggFunctions()[evalIdx];
+ aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName()));
+ }
+ } else {
+ aggEvalNames = null;
}
// visit a child node
@@ -444,23 +448,50 @@ public class ProjectionPushDownRule extends
}
// Getting projected targets
- List<Target> projectedTargets = TUtil.newList();
- for (Iterator<String> it = getFilteredReferences(targetNames, context.requiredSet); it.hasNext();) {
- String referenceName = it.next();
- Target target = context.targetListMgr.getTarget(referenceName);
+ if (node.hasAggFunctions()) {
+ AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length];
+ int i = 0;
+ for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
- if (context.targetListMgr.isResolved(referenceName)) {
- projectedTargets.add(new Target(new FieldEval(target.getNamedColumn())));
- } else if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
- projectedTargets.add(target);
- context.targetListMgr.resolve(target);
+ String referenceName = it.next();
+ Target target = context.targetListMgr.getTarget(referenceName);
+
+ if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
+ aggEvals[i++] = target.getEvalTree();
+ context.targetListMgr.resolve(target);
+ }
+ }
+ if (aggEvals.length > 0) {
+ node.setAggFunctions(aggEvals);
}
}
node.setInSchema(child.getOutSchema());
- node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+ Target [] targets = buildGroupByTarget(node, aggEvalNames);
+ node.setTargets(targets);
+
return node;
}
+ public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, String [] aggEvalNames) {
+ final int groupingKeyNum = groupbyNode.getGroupingColumns().length;
+ final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0;
+ EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions();
+ Target [] targets = new Target[groupingKeyNum + aggrFuncNum];
+
+ for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
+ targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx]));
+ }
+
+ if (aggEvalNames != null) {
+ for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) {
+ targets[targetIdx] =
+ new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType()));
+ }
+ }
+
+ return targets;
+ }
+
public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
Context newContext = new Context(context);
@@ -548,14 +579,14 @@ public class ProjectionPushDownRule extends
return new FilteredStringsIterator(targetNames, required);
}
- static Iterator<String> getFilteredReferences(String [] targetNames, Set<String> required) {
+ static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) {
return new FilteredStringsIterator(targetNames, required);
}
static class FilteredStringsIterator implements Iterator<String> {
Iterator<String> iterator;
- FilteredStringsIterator(Collection<String> targetNames, Set<String> required) {
+ FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) {
List<String> filtered = TUtil.newList();
for (String name : targetNames) {
if (required.contains(name)) {
@@ -566,7 +597,7 @@ public class ProjectionPushDownRule extends
iterator = filtered.iterator();
}
- FilteredStringsIterator(String [] targetNames, Set<String> required) {
+ FilteredStringsIterator(String [] targetNames, Collection<String> required) {
this(TUtil.newList(targetNames), required);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index e7eaf40..42508a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -32,14 +32,12 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.dataserver.HttpUtil;
import java.io.UnsupportedEncodingException;
@@ -418,14 +416,12 @@ public class TupleUtil {
private static class TupleBlockFilterScanner {
private EvalNode qual;
- private EvalContext qualCtx;
private Iterator<Tuple> iterator;
private Schema schema;
public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
this.schema = schema;
this.qual = qual;
- this.qualCtx = qual.newContext();
this.iterator = tuples.iterator();
}
@@ -435,8 +431,7 @@ public class TupleUtil {
Tuple tuple;
while (iterator.hasNext()) {
tuple = iterator.next();
- qual.eval(qualCtx, schema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
+ if (qual.eval(schema, tuple).isTrue()) {
results.add(tuple);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index cfe3b61..a969326 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -83,7 +83,7 @@ public class GlobalEngine extends AbstractService {
try {
analyzer = new SQLAnalyzer();
converter = new HiveConverter();
- preVerifier = new PreLogicalPlanVerifier();
+ preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
planner = new LogicalPlanner(context.getCatalog());
optimizer = new LogicalOptimizer(context.getConf());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 07655a2..299703c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -130,13 +130,10 @@ public class ExprTestBase {
try {
targets = getRawTargets(query);
- EvalContext [] evalContexts = new EvalContext[targets.length];
Tuple outTuple = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
EvalNode eval = targets[i].getEvalTree();
- evalContexts[i] = eval.newContext();
- eval.eval(evalContexts[i], inputSchema, vtuple);
- outTuple.put(i, eval.terminate(evalContexts[i]));
+ outTuple.put(i, eval.eval(inputSchema, vtuple));
}
for (int i = 0; i < expected.length; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 34ba1ca..0489e37 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -19,7 +19,6 @@
package org.apache.tajo.engine.eval;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -28,13 +27,6 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.function.GeneralFunction;
import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -43,8 +35,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-
import static org.apache.tajo.common.TajoDataTypes.Type.*;
import static org.junit.Assert.*;
@@ -136,15 +126,13 @@ public class TestEvalTree extends ExprTestBase{
schema1.addColumn("table1.score", INT4);
BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
- EvalContext evalCtx = expr.newContext();
assertCloneEqual(expr);
VTuple tuple = new VTuple(2);
tuple.put(0, DatumFactory.createInt4(1)); // put 0th field
tuple.put(1, DatumFactory.createInt4(99)); // put 0th field
// the result of evaluation must be 100.
- expr.eval(evalCtx, schema1, tuple);
- assertEquals(expr.terminate(evalCtx).asInt4(), 100);
+ assertEquals(expr.eval(schema1, tuple).asInt4(), 100);
}
public static class MockTrueEval extends EvalNode {
@@ -159,7 +147,7 @@ public class TestEvalTree extends ExprTestBase{
}
@Override
- public Datum terminate(EvalContext ctx) {
+ public Datum eval(Schema schema, Tuple tuple) {
return DatumFactory.createBool(true);
}
@@ -169,11 +157,6 @@ public class TestEvalTree extends ExprTestBase{
}
@Override
- public EvalContext newContext() {
- return null;
- }
-
- @Override
public DataType getValueType() {
return CatalogUtil.newSimpleDataType(BOOLEAN);
}
@@ -187,12 +170,7 @@ public class TestEvalTree extends ExprTestBase{
}
@Override
- public EvalContext newContext() {
- return null;
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
+ public Datum eval(Schema schema, Tuple tuple) {
return DatumFactory.createBool(false);
}
@@ -218,24 +196,16 @@ public class TestEvalTree extends ExprTestBase{
MockFalseExpr falseExpr = new MockFalseExpr();
BinaryEval andExpr = new BinaryEval(EvalType.AND, trueExpr, trueExpr);
- EvalContext evalCtx = andExpr.newContext();
- andExpr.eval(evalCtx, null, null);
- assertTrue(andExpr.terminate(evalCtx).asBool());
+ assertTrue(andExpr.eval(null, null).asBool());
andExpr = new BinaryEval(EvalType.AND, falseExpr, trueExpr);
- evalCtx = andExpr.newContext();
- andExpr.eval(evalCtx, null, null);
- assertFalse(andExpr.terminate(evalCtx).asBool());
+ assertFalse(andExpr.eval(null, null).asBool());
andExpr = new BinaryEval(EvalType.AND, trueExpr, falseExpr);
- evalCtx= andExpr.newContext();
- andExpr.eval(evalCtx, null, null);
- assertFalse(andExpr.terminate(evalCtx).asBool());
+ assertFalse(andExpr.eval(null, null).asBool());
andExpr = new BinaryEval(EvalType.AND, falseExpr, falseExpr);
- evalCtx= andExpr.newContext();
- andExpr.eval(evalCtx, null, null);
- assertFalse(andExpr.terminate(evalCtx).asBool());
+ assertFalse(andExpr.eval(null, null).asBool());
}
@Test
@@ -244,24 +214,16 @@ public class TestEvalTree extends ExprTestBase{
MockFalseExpr falseExpr = new MockFalseExpr();
BinaryEval orExpr = new BinaryEval(EvalType.OR, trueExpr, trueExpr);
- EvalContext evalCtx= orExpr.newContext();
- orExpr.eval(evalCtx, null, null);
- assertTrue(orExpr.terminate(evalCtx).asBool());
+ assertTrue(orExpr.eval(null, null).asBool());
orExpr = new BinaryEval(EvalType.OR, falseExpr, trueExpr);
- evalCtx= orExpr.newContext();
- orExpr.eval(evalCtx, null, null);
- assertTrue(orExpr.terminate(evalCtx).asBool());
+ assertTrue(orExpr.eval(null, null).asBool());
orExpr = new BinaryEval(EvalType.OR, trueExpr, falseExpr);
- evalCtx= orExpr.newContext();
- orExpr.eval(evalCtx, null, null);
- assertTrue(orExpr.terminate(evalCtx).asBool());
+ assertTrue(orExpr.eval(null, null).asBool());
orExpr = new BinaryEval(EvalType.OR, falseExpr, falseExpr);
- evalCtx = orExpr.newContext();
- orExpr.eval(evalCtx, null, null);
- assertFalse(orExpr.terminate(evalCtx).asBool());
+ assertFalse(orExpr.eval(null, null).asBool());
}
@Test
@@ -274,73 +236,41 @@ public class TestEvalTree extends ExprTestBase{
e1 = new ConstEval(DatumFactory.createInt4(9));
e2 = new ConstEval(DatumFactory.createInt4(34));
expr = new BinaryEval(EvalType.LTH, e1, e2);
- EvalContext evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LEQ, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LTH, e2, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LEQ, e2, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GTH, e2, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GEQ, e2, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GTH, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GEQ, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
BinaryEval plus = new BinaryEval(EvalType.PLUS, e1, e2);
expr = new BinaryEval(EvalType.LTH, e1, plus);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LEQ, e1, plus);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LTH, plus, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.LEQ, plus, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GTH, plus, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GEQ, plus, e1);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GTH, e1, plus);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
expr = new BinaryEval(EvalType.GEQ, e1, plus);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertFalse(expr.terminate(evalCtx).asBool());
+ assertFalse(expr.eval(null, null).asBool());
}
@Test
@@ -353,36 +283,28 @@ public class TestEvalTree extends ExprTestBase{
e1 = new ConstEval(DatumFactory.createInt4(9));
e2 = new ConstEval(DatumFactory.createInt4(34));
BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
- EvalContext evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertEquals(expr.terminate(evalCtx).asInt4(), 43);
+ assertEquals(expr.eval(null, null).asInt4(), 43);
assertCloneEqual(expr);
// MINUS
e1 = new ConstEval(DatumFactory.createInt4(5));
e2 = new ConstEval(DatumFactory.createInt4(2));
expr = new BinaryEval(EvalType.MINUS, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertEquals(expr.terminate(evalCtx).asInt4(), 3);
+ assertEquals(expr.eval(null, null).asInt4(), 3);
assertCloneEqual(expr);
// MULTIPLY
e1 = new ConstEval(DatumFactory.createInt4(5));
e2 = new ConstEval(DatumFactory.createInt4(2));
expr = new BinaryEval(EvalType.MULTIPLY, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertEquals(expr.terminate(evalCtx).asInt4(), 10);
+ assertEquals(expr.eval(null, null).asInt4(), 10);
assertCloneEqual(expr);
// DIVIDE
e1 = new ConstEval(DatumFactory.createInt4(10));
e2 = new ConstEval(DatumFactory.createInt4(5));
expr = new BinaryEval(EvalType.DIVIDE, e1, e2);
- evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertEquals(expr.terminate(evalCtx).asInt4(), 2);
+ assertEquals(expr.eval(null, null).asInt4(), 2);
assertCloneEqual(expr);
}
@@ -398,9 +320,7 @@ public class TestEvalTree extends ExprTestBase{
assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType());
expr = new BinaryEval(EvalType.LTH, e1, e2);
- EvalContext evalCtx = expr.newContext();
- expr.eval(evalCtx, null, null);
- assertTrue(expr.terminate(evalCtx).asBool());
+ assertTrue(expr.eval(null, null).asBool());
assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType());
e1 = new ConstEval(DatumFactory.createFloat8(9.3));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 86c1c01..86d603e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -219,19 +219,15 @@ public class TestEvalTreeUtil {
EvalNode first = cnf[0];
EvalNode second = cnf[1];
- FieldEval field = (FieldEval) first.getLeftExpr();
+ FieldEval field = first.getLeftExpr();
assertEquals(col1, field.getColumnRef());
assertEquals(EvalType.LTH, first.getType());
- EvalContext firstRCtx = first.getRightExpr().newContext();
- first.getRightExpr().eval(firstRCtx, null, null);
- assertEquals(10, first.getRightExpr().terminate(firstRCtx).asInt4());
+ assertEquals(10, first.getRightExpr().eval(null, null).asInt4());
- field = (FieldEval) second.getRightExpr();
+ field = second.getRightExpr();
assertEquals(col1, field.getColumnRef());
assertEquals(EvalType.LTH, second.getType());
- EvalContext secondLCtx = second.getLeftExpr().newContext();
- second.getLeftExpr().eval(secondLCtx, null, null);
- assertEquals(4, second.getLeftExpr().terminate(secondLCtx).asInt4());
+ assertEquals(4, second.getLeftExpr().eval(null, null).asInt4());
}
@Test
@@ -264,15 +260,11 @@ public class TestEvalTreeUtil {
public final void testSimplify() throws PlanningException {
Target [] targets = getRawTargets(QUERIES[0]);
EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
- EvalContext nodeCtx = node.newContext();
assertEquals(EvalType.CONST, node.getType());
- node.eval(nodeCtx, null, null);
- assertEquals(7, node.terminate(nodeCtx).asInt4());
+ assertEquals(7, node.eval(null, null).asInt4());
node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
assertEquals(EvalType.CONST, node.getType());
- nodeCtx = node.newContext();
- node.eval(nodeCtx, null, null);
- assertTrue(7.0d == node.terminate(nodeCtx).asFloat8());
+ assertTrue(7.0d == node.eval(null, null).asFloat8());
Expr expr = analyzer.parse(QUERIES[1]);
LogicalPlan plan = planner.createPlan(expr);
@@ -300,9 +292,7 @@ public class TestEvalTreeUtil {
assertEquals(EvalType.GTH, transposed.getType());
FieldEval field = transposed.getLeftExpr();
assertEquals(col1, field.getColumnRef());
- EvalContext evalCtx = transposed.getRightExpr().newContext();
- transposed.getRightExpr().eval(evalCtx, null, null);
- assertEquals(1, transposed.getRightExpr().terminate(evalCtx).asInt4());
+ assertEquals(1, transposed.getRightExpr().eval(null, null).asInt4());
node = getRootSelection(QUERIES[4]);
// we expect that score < 3
@@ -310,9 +300,7 @@ public class TestEvalTreeUtil {
assertEquals(EvalType.LTH, transposed.getType());
field = transposed.getLeftExpr();
assertEquals(col1, field.getColumnRef());
- evalCtx = transposed.getRightExpr().newContext();
- transposed.getRightExpr().eval(evalCtx, null, null);
- assertEquals(2, transposed.getRightExpr().terminate(evalCtx).asInt4());
+ assertEquals(2, transposed.getRightExpr().eval(null, null).asInt4());
}
@Test
@@ -321,11 +309,11 @@ public class TestEvalTreeUtil {
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(expr);
GroupbyNode groupByNode = plan.getRootBlock().getNode(NodeType.GROUP_BY);
- Target [] targets = groupByNode.getTargets();
+ EvalNode [] aggEvals = groupByNode.getAggFunctions();
List<AggregationFunctionCallEval> list = new ArrayList<AggregationFunctionCallEval>();
- for (int i = 0; i < targets.length; i++) {
- list.addAll(EvalTreeUtil.findDistinctAggFunction(targets[i].getEvalTree()));
+ for (int i = 0; i < aggEvals.length; i++) {
+ list.addAll(EvalTreeUtil.findDistinctAggFunction(aggEvals[i]));
}
assertEquals(2, list.size());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 6758189..9eb0276 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -570,7 +570,7 @@ public class TestLogicalPlanner {
static final String ALIAS [] = {
"select deptName, sum(score) as total from score group by deptName",
- "select em.empId as id, sum(score) as total from employee as em inner join score using (em.deptName)"
+ "select em.empId as id, sum(score) as total from employee as em inner join score using (em.deptName) group by id"
};
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 85ea4f2..00ce501 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -91,28 +91,6 @@ public class TestPlannerUtil {
}
@Test
- public final void testTransformTwoPhase() throws PlanningException {
- // without 'having clause'
- Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[7]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
-
- assertEquals(NodeType.ROOT, plan.getType());
- LogicalRootNode root = (LogicalRootNode) plan;
- TestLogicalPlanner.testQuery7(root.getChild());
-
- root.postOrder(new TwoPhaseBuilder());
- }
-
- private final class TwoPhaseBuilder implements LogicalNodeVisitor {
- @Override
- public void visit(LogicalNode node) {
- if (node.getType() == NodeType.GROUP_BY) {
- PlannerUtil.transformGroupbyTo2P((GroupbyNode) node);
- }
- }
- }
-
- @Test
public final void testFindTopNode() throws CloneNotSupportedException, PlanningException {
// two relations
Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[1]);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 0f3c165..ab27a45 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,9 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -35,14 +37,12 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
@@ -563,12 +563,8 @@ public class TestPhysicalPlanner {
// Set all aggregation functions to the first phase mode
GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
- for (Target target : groupbyNode.getTargets()) {
- for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
- if (eval instanceof AggregationFunctionCallEval) {
- ((AggregationFunctionCallEval) eval).setFirstPhase();
- }
- }
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ function.setFirstPhase();
}
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -597,12 +593,8 @@ public class TestPhysicalPlanner {
// Set all aggregation functions to the first phase mode
GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
- for (Target target : groupbyNode.getTargets()) {
- for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
- if (eval instanceof AggregationFunctionCallEval) {
- ((AggregationFunctionCallEval) eval).setFirstPhase();
- }
- }
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ function.setFirstPhase();
}
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
[2/3] TAJO-539: Change some EvalNode::eval to directly return a Datum
value.
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 341a58a..3e756d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -31,9 +31,12 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
@@ -212,15 +215,12 @@ public class GlobalPlanner {
// setup current block
ExecutionBlock currentBlock = context.plan.newExecutionBlock();
LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
- for (Target target : groupbyNode.getTargets()) {
- List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
- for (AggregationFunctionCallEval function : functions) {
- if (function.isDistinct()) {
- columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
- } else {
- // See the comment of this method. the aggregation function should be executed as the first phase.
- function.setFirstPhase();
- }
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ if (function.isDistinct()) {
+ columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+ } else {
+ // See the comment of this method. the aggregation function should be executed as the first phase.
+ function.setFirstPhase();
}
}
@@ -247,65 +247,140 @@ public class GlobalPlanner {
return currentBlock;
}
- private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+ private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
GroupbyNode groupbyNode) throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
- if (groupbyNode.isDistinct()) {
- return buildDistinctGroupBy(context, childBlock, groupbyNode);
+ if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
+ return buildDistinctGroupBy(context, lastBlock, groupbyNode);
} else {
- GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
+ GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
- if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
- ((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+ if (hasUnionChild(firstPhaseGroupby)) {
+ currentBlock = buildGroupbyAndUnionPlan(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+ } else {
+ // general hash-shuffled aggregation
+ currentBlock = buildTwoPhaseGroupby(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+ }
+ }
- currentBlock = childBlock;
- for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
- if (firstPhaseGroupBy.isEmptyGrouping()) {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
- } else {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
- }
- dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ return currentBlock;
+ }
- ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
- GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan(), firstPhaseGroupBy);
- g1.setChild(subBlock.getPlan());
- subBlock.setPlan(g1);
-
- GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan(), groupbyNode);
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- g2.setChild(scanNode);
- currentBlock.setPlan(g2);
- }
- } else { // general hash-shuffled aggregation
- childBlock.setPlan(firstPhaseGroupBy);
- currentBlock = masterPlan.newExecutionBlock();
+ public boolean hasUnionChild(UnaryNode node) {
- DataChannel channel;
- if (firstPhaseGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
- channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
- } else {
- channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
- channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
- }
- channel.setSchema(firstPhaseGroupBy.getOutSchema());
- channel.setStoreType(storeType);
+ if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) {
+ TableSubQueryNode tableSubQuery = node.getChild();
+ return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+ }
+
+ return false;
+ }
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- groupbyNode.setChild(scanNode);
- groupbyNode.setInSchema(scanNode.getOutSchema());
- currentBlock.setPlan(groupbyNode);
- masterPlan.addConnect(channel);
+ private static ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+ GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+ DataChannel lastDataChannel = null;
+
+ // It pushes down the first phase group-by operator into all child blocks.
+ //
+ // (second phase) G (currentBlock)
+ // /|\
+ // / / | \
+ // (first phase) G G G G (child block)
+
+ // They are already connected one another.
+ // So, we don't need to connect them again.
+ for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+ if (firstPhaseGroupBy.isEmptyGrouping()) {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+ } else {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
}
+ dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+ // Why must firstPhaseGroupby be copied?
+ //
+ // A groupby in each execution block can have different child.
+ // It affects groupby's input schema.
+ GroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+ firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+ childBlock.setPlan(firstPhaseGroupbyCopy);
+
+ // just keep the last data channel.
+ lastDataChannel = dataChannel;
+ }
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+ secondPhaseGroupBy.setChild(scanNode);
+ lastBlock.setPlan(secondPhaseGroupBy);
+ return lastBlock;
+ }
+
+ private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
+ GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+ ExecutionBlock childBlock = latestBlock;
+ childBlock.setPlan(firstPhaseGroupby);
+ ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel channel;
+ if (firstPhaseGroupby.isEmptyGrouping()) {
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+ channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
+ } else {
+ channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+ channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
}
+ channel.setSchema(firstPhaseGroupby.getOutSchema());
+ channel.setStoreType(storeType);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ secondPhaseGroupby.setChild(scanNode);
+ secondPhaseGroupby.setInSchema(scanNode.getOutSchema());
+ currentBlock.setPlan(secondPhaseGroupby);
+
+ masterPlan.addConnect(channel);
return currentBlock;
}
+ public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode groupBy) {
+ Preconditions.checkNotNull(groupBy);
+
+ GroupbyNode firstPhaseGroupBy = PlannerUtil.clone(plan, groupBy);
+ GroupbyNode secondPhaseGroupBy = groupBy;
+
+ // Set first phase expressions
+ if (secondPhaseGroupBy.hasAggFunctions()) {
+ int evalNum = secondPhaseGroupBy.getAggFunctions().length;
+ AggregationFunctionCallEval [] secondPhaseEvals = secondPhaseGroupBy.getAggFunctions();
+ AggregationFunctionCallEval [] firstPhaseEvals = new AggregationFunctionCallEval[evalNum];
+
+ String [] firstPhaseEvalNames = new String[evalNum];
+ for (int i = 0; i < evalNum; i++) {
+ try {
+ firstPhaseEvals[i] = (AggregationFunctionCallEval) secondPhaseEvals[i].clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+
+ firstPhaseEvals[i].setFirstPhase();
+ firstPhaseEvalNames[i] = plan.newGeneratedFieldName(firstPhaseEvals[i]);
+ FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+ secondPhaseEvals[i].setArgs(new EvalNode[] {param});
+ }
+
+ secondPhaseGroupBy.setAggFunctions(secondPhaseEvals);
+ firstPhaseGroupBy.setAggFunctions(firstPhaseEvals);
+ Target [] firstPhaseTargets = ProjectionPushDownRule.buildGroupByTarget(firstPhaseGroupBy, firstPhaseEvalNames);
+ firstPhaseGroupBy.setTargets(firstPhaseTargets);
+ secondPhaseGroupBy.setInSchema(PlannerUtil.targetToSchema(firstPhaseTargets));
+ }
+ return firstPhaseGroupBy;
+ }
+
private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 41f1e88..8acd32e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -20,14 +20,22 @@ package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.util.TUtil;
public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
- @Expose private Column [] columns;
- @Expose private Target [] targets;
+ /** Grouping key sets */
+ @Expose private Column [] groupingColumns;
+ /** Aggregation Functions */
+ @Expose private AggregationFunctionCallEval [] aggrFunctions;
+ /**
+ * It's a list of targets. The grouping columns should be followed by aggregation functions.
+ * aggrFunctions keep actual aggregation functions, but it only contains field references.
+ * */
+ @Expose private Target [] targets;
@Expose private boolean hasDistinct = false;
public GroupbyNode(int pid) {
@@ -35,15 +43,15 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
}
public final boolean isEmptyGrouping() {
- return columns == null || columns.length == 0;
+ return groupingColumns == null || groupingColumns.length == 0;
}
public void setGroupingColumns(Column [] groupingColumns) {
- this.columns = groupingColumns;
+ this.groupingColumns = groupingColumns;
}
public final Column [] getGroupingColumns() {
- return this.columns;
+ return this.groupingColumns;
}
public final boolean isDistinct() {
@@ -54,6 +62,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
hasDistinct = distinct;
}
+ public boolean hasAggFunctions() {
+ return this.aggrFunctions != null;
+ }
+
+ public AggregationFunctionCallEval [] getAggFunctions() {
+ return this.aggrFunctions;
+ }
+
+ public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+ this.aggrFunctions = evals;
+ }
+
@Override
public boolean hasTargets() {
return this.targets != null;
@@ -76,9 +96,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
public String toString() {
StringBuilder sb = new StringBuilder("\"GroupBy\": {\"grouping fields\":[");
- for (int i=0; i < columns.length; i++) {
- sb.append("\"").append(columns[i]).append("\"");
- if(i < columns.length - 1)
+ for (int i=0; i < groupingColumns.length; i++) {
+ sb.append("\"").append(groupingColumns[i]).append("\"");
+ if(i < groupingColumns.length - 1)
sb.append(",");
}
@@ -92,6 +112,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
}
sb.append("],");
}
+ if (aggrFunctions != null) {
+ sb.append("\n \"expr\": ").append(TUtil.arrayToString(aggrFunctions)).append(",");
+ }
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
sb.append("\n \"in schema\": ").append(getInSchema());
sb.append("}");
@@ -104,7 +127,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
if (obj instanceof GroupbyNode) {
GroupbyNode other = (GroupbyNode) obj;
boolean eq = super.equals(other);
- eq = eq && TUtil.checkEquals(columns, other.columns);
+ eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+ eq = eq && TUtil.checkEquals(aggrFunctions, other.aggrFunctions);
eq = eq && TUtil.checkEquals(targets, other.targets);
return eq;
} else {
@@ -115,10 +139,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
GroupbyNode grp = (GroupbyNode) super.clone();
- if (columns != null) {
- grp.columns = new Column[columns.length];
- for (int i = 0; i < columns.length; i++) {
- grp.columns[i] = (Column) columns[i].clone();
+ if (groupingColumns != null) {
+ grp.groupingColumns = new Column[groupingColumns.length];
+ for (int i = 0; i < groupingColumns.length; i++) {
+ grp.groupingColumns[i] = (Column) groupingColumns[i].clone();
+ }
+ }
+
+ if (aggrFunctions != null) {
+ grp.aggrFunctions = new AggregationFunctionCallEval[aggrFunctions.length];
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ grp.aggrFunctions[i] = (AggregationFunctionCallEval) aggrFunctions[i].clone();
}
}
@@ -138,7 +169,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
StringBuilder sb = new StringBuilder();
sb.append("(");
- Column [] groupingColumns = columns;
+ Column [] groupingColumns = this.groupingColumns;
for (int j = 0; j < groupingColumns.length; j++) {
sb.append(groupingColumns[j].getColumnName());
if(j < groupingColumns.length - 1) {
@@ -150,6 +181,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
planStr.appendTitle(sb.toString());
+ sb = new StringBuilder();
+ sb.append("(");
+ for (int j = 0; j < aggrFunctions.length; j++) {
+ sb.append(aggrFunctions[j]);
+ if(j < aggrFunctions.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
sb = new StringBuilder("target list: ");
for (int i = 0; i < targets.length; i++) {
sb.append(targets[i]);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index ed82cef..eb69703 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -18,31 +18,22 @@
package org.apache.tajo.engine.planner.physical;
-import com.google.common.collect.Sets;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.util.List;
-import java.util.Set;
public abstract class AggregationExec extends UnaryPhysicalExec {
protected GroupbyNode plan;
- protected Set<Column> nonNullGroupingFields;
- protected int keylist [];
- protected int measureList[];
- protected final EvalNode evals [];
- protected EvalContext evalContexts [];
+ protected final int groupingKeyNum;
+ protected int groupingKeyIds[];
+ protected final int aggFunctionsNum;
+ protected final AggregationFunctionCallEval aggFunctions[];
+
protected Schema evalSchema;
public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
@@ -52,47 +43,26 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
evalSchema = plan.getOutSchema();
- nonNullGroupingFields = Sets.newHashSet();
- // keylist will contain a list of IDs of grouping column
- keylist = new int[plan.getGroupingColumns().length];
+ final Column [] keyColumns = plan.getGroupingColumns();
+ groupingKeyNum = keyColumns.length;
+ groupingKeyIds = new int[groupingKeyNum];
Column col;
for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
- col = plan.getGroupingColumns()[idx];
- keylist[idx] = inSchema.getColumnId(col.getQualifiedName());
- nonNullGroupingFields.add(col);
- }
-
- // measureList will contain a list of measure field indexes against the target list.
- List<Integer> measureIndexes = TUtil.newList();
- for (int i = 0; i < plan.getTargets().length; i++) {
- Target target = plan.getTargets()[i];
- if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
- measureIndexes.add(i);
- }
- }
-
- measureList = new int[measureIndexes.size()];
- for (int i = 0; i < measureIndexes.size(); i++) {
- measureList[i] = measureIndexes.get(i);
+ col = keyColumns[idx];
+ groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
}
- evals = new EvalNode[plan.getTargets().length];
- evalContexts = new EvalContext[plan.getTargets().length];
- for (int i = 0; i < plan.getTargets().length; i++) {
- Target t = plan.getTargets()[i];
- if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
- evals[i] = new ConstEval(DatumFactory.createNullDatum());
- evalContexts[i] = evals[i].newContext();
- } else {
- evals[i] = t.getEvalTree();
- evalContexts[i] = evals[i].newContext();
- }
+ if (plan.hasAggFunctions()) {
+ aggFunctions = plan.getAggFunctions();
+ aggFunctionsNum = aggFunctions.length;
+ } else {
+ aggFunctions = new AggregationFunctionCallEval[0];
+ aggFunctionsNum = 0;
}
}
@Override
public void close() throws IOException {
super.close();
- nonNullGroupingFields.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index cf56200..b39a9f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,15 +18,14 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,7 +37,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
private final JoinNode plan;
private final boolean hasJoinQual;
private final EvalNode joinQual;
- private final EvalContext qualCtx;
private final List<Tuple> leftTupleSlots;
private final List<Tuple> rightTupleSlots;
@@ -58,7 +56,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -67,10 +64,8 @@ public class BNLJoinExec extends BinaryPhysicalExec {
this.joinQual = plan.getJoinQual();
if (joinQual != null) { // if join type is not 'cross join'
hasJoinQual = true;
- this.qualCtx = this.joinQual.newContext();
} else {
hasJoinQual = false;
- this.qualCtx = null;
}
this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
@@ -85,7 +80,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
}
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -191,15 +185,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
frameTuple.set(leftTuple, rightIterator.next());
if (hasJoinQual) {
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outputTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outputTuple);
return outputTuple;
}
} else {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outputTuple);
+ projector.eval(frameTuple, outputTuple);
return outputTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 30c1fa7..2ff6fc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,16 +19,15 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -37,11 +36,9 @@ public class BSTIndexScanExec extends PhysicalExec {
private SeekableScanner fileScanner;
private EvalNode qual;
- private EvalContext qualCtx;
private BSTIndex.BSTIndexReader reader;
private final Projector projector;
- private EvalContext [] evalContexts;
private Datum[] datum = null;
@@ -54,18 +51,12 @@ public class BSTIndexScanExec extends PhysicalExec {
super(context, scanNode.getInSchema(), scanNode.getOutSchema());
this.scanNode = scanNode;
this.qual = scanNode.getQual();
- if(this.qual == null) {
- this.qualCtx = null;
- } else {
- this.qualCtx = this.qual.newContext();
- }
this.datum = datum;
this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
- this.evalContexts = projector.newContexts();
this.reader = new BSTIndex(sm.getFileSystem().getConf()).
getIndexReader(fileName, keySchema, comparator);
@@ -109,18 +100,15 @@ public class BSTIndexScanExec extends PhysicalExec {
Tuple outTuple = new VTuple(this.outSchema.getColumnNum());
if (!scanNode.hasQual()) {
if ((tuple = fileScanner.next()) != null) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
return outTuple;
} else {
return null;
}
} else {
- while( reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
return outTuple;
} else {
fileScanner.seek(reader.next());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 4480747..83580f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,43 +18,32 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.EvalExprNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class EvalExprExec extends PhysicalExec {
private final EvalExprNode plan;
- private final EvalContext[] evalContexts;
public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
super(context, plan.getInSchema(), plan.getOutSchema());
this.plan = plan;
-
- evalContexts = new EvalContext[plan.getTargets().length];
- for (int i = 0; i < plan.getTargets().length; i++) {
- evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
- }
}
@Override
public void init() throws IOException {
}
- /* (non-Javadoc)
- * @see PhysicalExec#next()
- */
@Override
public Tuple next() throws IOException {
Target [] targets = plan.getTargets();
Tuple t = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
- targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);
- t.put(i, targets[i].getEvalTree().terminate(evalContexts[i]));
+ t.put(i, targets[i].getEvalTree().eval(inSchema, null));
}
return t;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index ed2f275..d533b82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,7 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -35,44 +35,38 @@ import java.util.Map.Entry;
*/
public class HashAggregateExec extends AggregationExec {
private Tuple tuple = null;
- private Map<Tuple, EvalContext[]> tupleSlots;
+ private Map<Tuple, FunctionContext[]> hashTable;
private boolean computed = false;
- private Iterator<Entry<Tuple, EvalContext []>> iterator = null;
+ private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
- /**
- * @throws java.io.IOException
- *
- */
- public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode annotation,
- PhysicalExec subOp) throws IOException {
- super(ctx, annotation, subOp);
- tupleSlots = new HashMap<Tuple, EvalContext[]>(10000);
- this.tuple = new VTuple(evalSchema.getColumnNum());
+ public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+ super(ctx, plan, subOp);
+ hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+ this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
}
private void compute() throws IOException {
Tuple tuple;
Tuple keyTuple;
- int targetLength = plan.getTargets().length;
while((tuple = child.next()) != null && !context.isStopped()) {
- keyTuple = new VTuple(keylist.length);
+ keyTuple = new VTuple(groupingKeyIds.length);
// build one key tuple
- for(int i = 0; i < keylist.length; i++) {
- keyTuple.put(i, tuple.get(keylist[i]));
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(groupingKeyIds[i]));
}
- if(tupleSlots.containsKey(keyTuple)) {
- EvalContext [] tmpTuple = tupleSlots.get(keyTuple);
- for(int i = 0; i < measureList.length; i++) {
- evals[measureList[i]].eval(tmpTuple[measureList[i]], inSchema, tuple);
+ if(hashTable.containsKey(keyTuple)) {
+ FunctionContext [] contexts = hashTable.get(keyTuple);
+ for(int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
} else { // if the key occurs firstly
- EvalContext evalCtx [] = new EvalContext[targetLength];
- for(int i = 0; i < targetLength; i++) {
- evalCtx[i] = evals[i].newContext();
- evals[i].eval(evalCtx[i], inSchema, tuple);
+ FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
- tupleSlots.put(keyTuple, evalCtx);
+ hashTable.put(keyTuple, contexts);
}
}
}
@@ -81,16 +75,23 @@ public class HashAggregateExec extends AggregationExec {
public Tuple next() throws IOException {
if(!computed) {
compute();
- iterator = tupleSlots.entrySet().iterator();
+ iterator = hashTable.entrySet().iterator();
computed = true;
}
- EvalContext [] ctx;
+ FunctionContext [] contexts;
if (iterator.hasNext()) {
- ctx = iterator.next().getValue();
- for (int i = 0; i < ctx.length; i++) {
- tuple.put(i, evals[i].terminate(ctx[i]));
+ Entry<Tuple, FunctionContext []> entry = iterator.next();
+ Tuple keyTuple = entry.getKey();
+ contexts = entry.getValue();
+
+ int tupleIdx = 0;
+ for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+ tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+ }
+ for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+ tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
}
return tuple;
@@ -101,12 +102,12 @@ public class HashAggregateExec extends AggregationExec {
@Override
public void rescan() throws IOException {
- iterator = tupleSlots.entrySet().iterator();
+ iterator = hashTable.entrySet().iterator();
}
@Override
public void close() throws IOException {
super.close();
- tupleSlots.clear();
+ hashTable.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index caea5d9..848e362 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,9 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
@@ -48,7 +47,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -60,7 +58,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -72,7 +69,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
plan.getOutSchema(), outer, inner);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
// this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
@@ -95,7 +91,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -156,8 +151,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
} else {
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
@@ -173,8 +167,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
shouldGetLeftTuple = true;
return outTuple;
@@ -184,10 +177,9 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
- if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
getKeyLeftTuple(leftTuple, leftKeyTuple);
matched.put(leftKeyTuple, true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index e08e07d..08b7035 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,9 +18,7 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -29,6 +27,7 @@ import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
@@ -46,7 +45,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -58,7 +56,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
PhysicalExec rightExec) {
@@ -66,7 +63,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
leftExec, rightExec);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
@@ -85,7 +81,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -131,10 +126,8 @@ public class HashJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 6b2d7b8..d7437c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -80,8 +80,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
} else {
// if not found, it returns a tuple.
frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
@@ -91,16 +90,14 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
while (notFound && iterator.hasNext()) {
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
notFound = false;
}
}
if (notFound) { // if there is no matched tuple
frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 841ff5a..d0c9897 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,9 +18,9 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,13 +30,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// from logical plan
@@ -51,7 +49,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
protected Tuple outTuple = null;
protected Map<Tuple, List<Tuple>> tupleSlots;
protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
protected Tuple leftTuple;
protected Tuple leftKeyTuple;
@@ -63,7 +60,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
- protected final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -75,7 +71,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
plan.getOutSchema(), leftChild, rightChild);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
@@ -93,7 +88,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -137,8 +131,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
shouldGetLeftTuple = true;
return outTuple;
@@ -148,10 +141,8 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
// getting a next right tuple on in-memory hash table.
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index c5e2d24..842c2e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -89,11 +89,9 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
while (notFound && iterator.hasNext()) {
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
notFound = false;
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index 41ff7bf..0418f65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -18,21 +18,15 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.HavingNode;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class HavingExec extends UnaryPhysicalExec {
private final EvalNode qual;
- private final EvalContext qualCtx;
- private final Tuple outputTuple;
public HavingExec(TaskAttemptContext context,
HavingNode plan,
@@ -40,17 +34,14 @@ public class HavingExec extends UnaryPhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.qual = plan.getQual();
- this.qualCtx = this.qual.newContext();
- this.outputTuple = new VTuple(outSchema.getColumnNum());
}
@Override
public Tuple next() throws IOException {
Tuple tuple;
while ((tuple = child.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).asBool()) {
- return tuple;
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 446755d..1d6da3f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -41,7 +40,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -62,7 +60,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -78,7 +75,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -93,7 +89,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -148,8 +143,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
rightTuple = rightChild.next();
return outTuple;
@@ -159,8 +153,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
leftTuple = leftChild.next();
return outTuple;
@@ -206,8 +199,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// BEFORE RETURN, MOVE FORWARD
rightTuple = rightChild.next();
if(rightTuple == null) {
@@ -221,8 +213,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
// BEFORE RETURN, MOVE FORWARD
leftTuple = leftChild.next();
@@ -297,9 +288,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
posRightTupleSlots = posRightTupleSlots + 1;
frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
} else {
// right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
@@ -312,9 +302,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
posLeftTupleSlots = posLeftTupleSlots + 1;
frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e128fea..e1d377e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -63,7 +61,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -72,7 +69,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -89,7 +85,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -165,10 +160,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
}
frameTuple.set(outerNext, innerIterator.next());
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 055c66e..961be93 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -18,14 +18,13 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -41,10 +40,8 @@ public class NLJoinExec extends BinaryPhysicalExec {
private Tuple outerTuple = null;
private Tuple innerTuple = null;
private Tuple outTuple = null;
- private EvalContext qualCtx;
// projection
- private final EvalContext [] evalContexts;
private final Projector projector;
public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
@@ -54,12 +51,10 @@ public class NLJoinExec extends BinaryPhysicalExec {
if (plan.hasJoinQual()) {
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
}
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
needNewOuter = true;
@@ -90,15 +85,12 @@ public class NLJoinExec extends BinaryPhysicalExec {
frameTuple.set(outerTuple, innerTuple);
if (joinQual != null) {
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
} else {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 305cdd2..4abe570 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -18,8 +18,6 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
@@ -27,6 +25,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -41,10 +40,8 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
private Tuple leftTuple = null;
private Tuple rightTuple = null;
private Tuple outTuple = null;
- private EvalContext qualCtx;
// projection
- private final EvalContext [] evalContexts;
private final Projector projector;
private boolean foundAtLeastOneMatch;
@@ -57,12 +54,10 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
if (plan.hasJoinQual()) {
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
}
// for projection
projector = new Projector(inSchema, outSchema, plan.getTargets());
- evalContexts = projector.newContexts();
// for join
needNextRightTuple = true;
@@ -96,8 +91,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
//output a tuple with the nulls padded rightTuple
Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
foundAtLeastOneMatch = true;
needNextRightTuple = true;
@@ -111,10 +105,9 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
}
frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).isTrue()) {
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ ;
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
foundAtLeastOneMatch = true;
return outTuple;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index c57089e..ecc6dd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,12 +21,11 @@
*/
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.planner.logical.Projectable;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -35,7 +34,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
// for projection
private Tuple outTuple;
- private EvalContext[] evalContexts;
private Projector projector;
public ProjectionExec(TaskAttemptContext context, Projectable plan,
@@ -49,7 +47,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
this.outTuple = new VTuple(outSchema.getColumnNum());
this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
- this.evalContexts = projector.newContexts();
}
@Override
@@ -60,8 +57,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
return null;
}
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 2f1d33d..365faba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -19,10 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// from logical plan
private JoinNode joinNode;
private EvalNode joinQual;
- private EvalContext qualCtx;
// temporal tuples and states for nested loop join
private FrameTuple frameTuple;
@@ -61,7 +59,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// projection
private final Projector projector;
- private final EvalContext [] evalContexts;
private int rightNumCols;
private int leftNumCols;
@@ -77,7 +74,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
"but there is no join condition");
this.joinNode = plan;
this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -91,7 +87,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// for projection
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
// for join
frameTuple = new FrameTuple();
@@ -158,8 +153,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded leftTuple
Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
rightTuple = rightChild.next();
@@ -225,8 +219,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
// output a tuple with the nulls padded left tuple
Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(frameTuple, outTuple);
// we simulate we found a match, which is exactly the null padded one
// BEFORE RETURN, MOVE FORWARD
@@ -305,9 +298,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
posRightTupleSlots = posRightTupleSlots + 1;
frameTuple.set(nextLeft, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
} else {
@@ -321,9 +313,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
posLeftTupleSlots = posLeftTupleSlots + 1;
frameTuple.set(nextLeft, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 5158dc0..2e676e9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.SelectionNode;
import org.apache.tajo.storage.Tuple;
@@ -28,23 +27,20 @@ import java.io.IOException;
public class SelectionExec extends UnaryPhysicalExec {
private final EvalNode qual;
- private final EvalContext qualCtx;
public SelectionExec(TaskAttemptContext context,
SelectionNode plan,
PhysicalExec child) {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.qual = plan.getQual();
- this.qualCtx = this.qual.newContext();
}
@Override
public Tuple next() throws IOException {
Tuple tuple;
while ((tuple = child.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).isTrue()) {
- return tuple;
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 6d3d6b1..c38be92 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,21 +18,24 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.HashSet;
@@ -46,12 +49,10 @@ public class SeqScanExec extends PhysicalExec {
private Scanner scanner = null;
private EvalNode qual = null;
- private EvalContext qualCtx;
private CatalogProtos.FragmentProto [] fragments;
private Projector projector;
- private EvalContext [] evalContexts;
public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
@@ -60,12 +61,6 @@ public class SeqScanExec extends PhysicalExec {
this.plan = plan;
this.qual = plan.getQual();
this.fragments = fragments;
-
- if (qual == null) {
- qualCtx = null;
- } else {
- qualCtx = this.qual.newContext();
- }
}
/**
@@ -100,9 +95,7 @@ public class SeqScanExec extends PhysicalExec {
// However, actual values absent in tuples. So, Replace all column references by constant datum.
for (Column column : columnPartitionSchema.toArray()) {
FieldEval targetExpr = new FieldEval(column);
- EvalContext evalContext = targetExpr.newContext();
- targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
- Datum datum = targetExpr.terminate(evalContext);
+ Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
ConstEval constExpr = new ConstEval(datum);
for (Target target : plan.getTargets()) {
if (target.getEvalTree().equals(targetExpr)) {
@@ -151,7 +144,6 @@ public class SeqScanExec extends PhysicalExec {
}
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.newContexts();
if (fragments.length > 1) {
this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
@@ -172,8 +164,7 @@ public class SeqScanExec extends PhysicalExec {
if (!plan.hasQual()) {
if ((tuple = scanner.next()) != null) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+ projector.eval(tuple, outTuple);
outTuple.setOffset(tuple.getOffset());
return outTuple;
} else {
@@ -181,10 +172,9 @@ public class SeqScanExec extends PhysicalExec {
}
} else {
while ((tuple = scanner.next()) != null) {
- qual.eval(qualCtx, inSchema, tuple);
- if (qual.terminate(qualCtx).isTrue()) {
- projector.eval(evalContexts, tuple);
- projector.terminate(evalContexts, outTuple);
+
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
return outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index af1bf34..dbe45dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,82 +18,108 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
/**
- * This is the sort-based Aggregation Operator.
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
*/
public class SortAggregateExec extends AggregationExec {
- private Tuple prevKey = null;
+ private Tuple lastKey = null;
private boolean finished = false;
+ private FunctionContext contexts[];
-
- public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan,
- PhysicalExec child) throws IOException {
+ public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
super(context, plan, child);
+ contexts = new FunctionContext[plan.getAggFunctions().length];
}
@Override
public Tuple next() throws IOException {
- Tuple curKey;
+ Tuple currentKey;
Tuple tuple;
- Tuple finalTuple = null;
+ Tuple outputTuple = null;
+
while(!context.isStopped() && (tuple = child.next()) != null) {
- // build a key tuple
- curKey = new VTuple(keylist.length);
- for(int i = 0; i < keylist.length; i++) {
- curKey.put(i, tuple.get(keylist[i]));
+
+ // get a key tuple
+ currentKey = new VTuple(groupingKeyIds.length);
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ currentKey.put(i, tuple.get(groupingKeyIds[i]));
}
- if (prevKey == null || prevKey.equals(curKey)) {
- if (prevKey == null) {
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- evalContexts[i] = evals[i].newContext();
- evals[i].eval(evalContexts[i], inSchema, tuple);
+ /** Aggregation State */
+ if (lastKey == null || lastKey.equals(currentKey)) {
+ if (lastKey == null) {
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
- prevKey = curKey;
+ lastKey = currentKey;
} else {
// aggregate
- for (int idx : measureList) {
- evals[idx].eval(evalContexts[idx], inSchema, tuple);
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
}
- } else {
+
+ } else { /** Finalization State */
// finalize aggregate and return
- finalTuple = new VTuple(outSchema.getColumnNum());
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+ outputTuple = new VTuple(outSchema.getColumnNum());
+ int tupleIdx = 0;
+
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
}
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- evalContexts[i] = evals[i].newContext();
- evals[i].eval(evalContexts[i], inSchema, tuple);
+ for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+ contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+ aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
}
- prevKey = curKey;
- return finalTuple;
+
+ lastKey = currentKey;
+ return outputTuple;
}
} // while loop
if (!finished) {
- finalTuple = new VTuple(outSchema.getColumnNum());
- for(int i = 0; i < outSchema.getColumnNum(); i++) {
- finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+ outputTuple = new VTuple(outSchema.getColumnNum());
+ int tupleIdx = 0;
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
}
finished = true;
}
- return finalTuple;
+ return outputTuple;
}
@Override
public void rescan() throws IOException {
super.rescan();
- prevKey = null;
+ lastKey = null;
finished = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 72ff67c..041220a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -95,16 +95,13 @@ public class PartitionedTableRewriter implements RewriteRule {
}
private static class PartitionPathFilter implements PathFilter {
- private FileSystem fs;
private Schema schema;
private EvalNode partitionFilter;
- private EvalContext evalContext;
public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
this.schema = schema;
this.partitionFilter = partitionFilter;
- evalContext = partitionFilter.newContext();
}
@Override
@@ -113,8 +110,8 @@ public class PartitionedTableRewriter implements RewriteRule {
if (tuple == null) { // if it is a file or not acceptable file
return false;
}
- partitionFilter.eval(evalContext, schema, tuple);
- return partitionFilter.terminate(evalContext).asBool();
+
+ return partitionFilter.eval(schema, tuple).asBool();
}
@Override
[3/3] git commit: TAJO-539: Change some EvalNode::eval to directly
return a Datum value.
Posted by hy...@apache.org.
TAJO-539: Change some EvalNode::eval to directly return a Datum value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e23e78cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e23e78cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e23e78cc
Branch: refs/heads/master
Commit: e23e78ccd4c740fd93d2629e5b523c3f4b29c199
Parents: 0386268
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Jan 25 15:34:35 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Jan 25 15:34:45 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../eval/AggregationFunctionCallEval.java | 41 ++---
.../apache/tajo/engine/eval/AlgebraicUtil.java | 4 +-
.../tajo/engine/eval/BetweenPredicateEval.java | 85 +++------
.../org/apache/tajo/engine/eval/BinaryEval.java | 28 +--
.../apache/tajo/engine/eval/CaseWhenEval.java | 73 ++------
.../org/apache/tajo/engine/eval/CastEval.java | 25 +--
.../org/apache/tajo/engine/eval/ConstEval.java | 18 +-
.../apache/tajo/engine/eval/EvalContext.java | 22 ---
.../org/apache/tajo/engine/eval/EvalNode.java | 6 +-
.../apache/tajo/engine/eval/EvalTreeUtil.java | 5 +-
.../org/apache/tajo/engine/eval/FieldEval.java | 22 +--
.../apache/tajo/engine/eval/FunctionEval.java | 20 +--
.../tajo/engine/eval/GeneralFunctionEval.java | 20 +--
.../org/apache/tajo/engine/eval/InEval.java | 29 +--
.../org/apache/tajo/engine/eval/IsNullEval.java | 30 +---
.../org/apache/tajo/engine/eval/NotEval.java | 20 +--
.../tajo/engine/eval/PartialBinaryExpr.java | 14 +-
.../engine/eval/PatternMatchPredicateEval.java | 30 +---
.../tajo/engine/eval/RowConstantEval.java | 20 +--
.../org/apache/tajo/engine/eval/SignedEval.java | 21 +--
.../tajo/engine/planner/ExprsVerifier.java | 13 +-
.../engine/planner/LogicalPlanVerifier.java | 11 --
.../tajo/engine/planner/LogicalPlanner.java | 165 ++++++++++-------
.../apache/tajo/engine/planner/PlannerUtil.java | 96 +---------
.../engine/planner/PreLogicalPlanVerifier.java | 69 +++++++-
.../apache/tajo/engine/planner/Projector.java | 20 +--
.../engine/planner/global/GlobalPlanner.java | 177 +++++++++++++------
.../engine/planner/logical/GroupbyNode.java | 70 ++++++--
.../planner/physical/AggregationExec.java | 64 ++-----
.../engine/planner/physical/BNLJoinExec.java | 19 +-
.../planner/physical/BSTIndexScanExec.java | 24 +--
.../engine/planner/physical/EvalExprExec.java | 15 +-
.../planner/physical/HashAggregateExec.java | 65 +++----
.../planner/physical/HashFullOuterJoinExec.java | 20 +--
.../engine/planner/physical/HashJoinExec.java | 13 +-
.../planner/physical/HashLeftAntiJoinExec.java | 9 +-
.../planner/physical/HashLeftOuterJoinExec.java | 21 +--
.../planner/physical/HashLeftSemiJoinExec.java | 6 +-
.../engine/planner/physical/HavingExec.java | 13 +-
.../physical/MergeFullOuterJoinExec.java | 29 +--
.../engine/planner/physical/MergeJoinExec.java | 14 +-
.../engine/planner/physical/NLJoinExec.java | 16 +-
.../planner/physical/NLLeftOuterJoinExec.java | 17 +-
.../engine/planner/physical/ProjectionExec.java | 10 +-
.../physical/RightOuterMergeJoinExec.java | 23 +--
.../engine/planner/physical/SelectionExec.java | 8 +-
.../engine/planner/physical/SeqScanExec.java | 42 ++---
.../planner/physical/SortAggregateExec.java | 94 ++++++----
.../rewrite/PartitionedTableRewriter.java | 7 +-
.../planner/rewrite/ProjectionPushDownRule.java | 77 +++++---
.../org/apache/tajo/engine/utils/TupleUtil.java | 7 +-
.../org/apache/tajo/master/GlobalEngine.java | 2 +-
.../apache/tajo/engine/eval/ExprTestBase.java | 5 +-
.../apache/tajo/engine/eval/TestEvalTree.java | 144 ++++-----------
.../tajo/engine/eval/TestEvalTreeUtil.java | 34 ++--
.../tajo/engine/planner/TestLogicalPlanner.java | 2 +-
.../tajo/engine/planner/TestPlannerUtil.java | 22 ---
.../planner/physical/TestPhysicalPlanner.java | 24 +--
59 files changed, 774 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f4fa4e3..c04b2d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,9 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-539: Change some EvalNode::eval to directly return a Datum value.
+ (hyunsik)
+
TAJO-543: InsertNode and CreateTableNode should play their roles. (hyunsik)
TAJO-409: Add explored and explained annotations to Tajo function system.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
index 290ff45..10eadce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
@@ -21,10 +21,10 @@ package org.apache.tajo.engine.eval;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -38,40 +38,38 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
this.instance = instance;
}
- @Override
- public EvalContext newContext() {
- return new AggFunctionCtx(argEvals, instance.newContext());
+ public FunctionContext newContext() {
+ return instance.newContext();
}
- @Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- AggFunctionCtx localCtx = (AggFunctionCtx) ctx;
+ public void merge(FunctionContext context, Schema schema, Tuple tuple) {
if (params == null) {
this.params = new VTuple(argEvals.length);
}
if (argEvals != null) {
- params.clear();
-
for (int i = 0; i < argEvals.length; i++) {
- argEvals[i].eval(localCtx.argCtxs[i], schema, tuple);
- params.put(i, argEvals[i].terminate(localCtx.argCtxs[i]));
+ params.put(i, argEvals[i].eval(schema, tuple));
}
}
if (firstPhase) {
- instance.eval(localCtx.funcCtx, params);
+ instance.eval(context, params);
} else {
- instance.merge(localCtx.funcCtx, params);
+ instance.merge(context, params);
}
}
@Override
- public Datum terminate(EvalContext ctx) {
+ public Datum eval(Schema schema, Tuple tuple) {
+ throw new UnsupportedOperationException("Cannot execute eval() of aggregation function");
+ }
+
+ public Datum terminate(FunctionContext context) {
if (firstPhase) {
- return instance.getPartialResult(((AggFunctionCtx)ctx).funcCtx);
+ return instance.getPartialResult(context);
} else {
- return instance.terminate(((AggFunctionCtx)ctx).funcCtx);
+ return instance.terminate(context);
}
}
@@ -91,13 +89,4 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
public void setFirstPhase() {
this.firstPhase = true;
}
-
- protected class AggFunctionCtx extends FuncCallCtx {
- FunctionContext funcCtx;
-
- AggFunctionCtx(EvalNode [] argEvals, FunctionContext funcCtx) {
- super(argEvals);
- this.funcCtx = funcCtx;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
index 70ae712..6bb0160 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
@@ -157,9 +157,7 @@ public class AlgebraicUtil {
right = eliminateConstantExprs(right);
if (left.getType() == EvalType.CONST && right.getType() == EvalType.CONST) {
- EvalContext exprCtx = expr.newContext();
- expr.eval(exprCtx, null, null);
- return new ConstEval(expr.terminate(exprCtx));
+ return new ConstEval(expr.eval(null, null));
} else {
return new BinaryEval(expr.getType(), left, right);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
index 0215928..61dc02b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
@@ -47,7 +47,7 @@ public class BetweenPredicateEval extends EvalNode {
}
private static interface Checker {
- void eval(BetweenContext context, Schema schema, Tuple param);
+ Datum eval(Schema schema, Tuple param);
}
private static class ConstantChecker implements Checker {
@@ -69,16 +69,14 @@ public class BetweenPredicateEval extends EvalNode {
}
@Override
- public void eval(BetweenContext context, Schema schema, Tuple param) {
- predicand.eval(context.predicandContext, schema, param);
- Datum predicandValue = predicand.terminate(context.predicandContext);
+ public Datum eval(Schema schema, Tuple param) {
+ Datum predicandValue = predicand.eval(schema, param);
- if (!(predicandValue instanceof NullDatum)) {
- context.result =
- DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(begin).asBool()
+ if (!predicandValue.isNull()) {
+ return DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(begin).asBool()
&& predicandValue.lessThanEqual(end).asBool()));
} else {
- context.result = NullDatum.get();
+ return NullDatum.get();
}
}
}
@@ -97,20 +95,17 @@ public class BetweenPredicateEval extends EvalNode {
}
@Override
- public void eval(BetweenContext context, Schema schema, Tuple param) {
- predicand.eval(context.predicandContext, schema, param);
- Datum predicandValue = predicand.terminate(context.predicandContext);
- begin.eval(context.beginContext, schema, param);
- Datum beginValue = begin.terminate(context.beginContext);
- end.eval(context.endContext, schema, param);
- Datum endValue = begin.terminate(context.endContext);
-
- if (!(predicandValue instanceof NullDatum || beginValue instanceof NullDatum || endValue instanceof NullDatum)) {
- context.result =
+ public Datum eval(Schema schema, Tuple param) {
+ Datum predicandValue = predicand.eval(schema, param);
+ Datum beginValue = begin.eval(schema, param);
+ Datum endValue = end.eval(schema, param);
+
+ if (!(predicandValue.isNull() || beginValue.isNull() || endValue.isNull())) {
+ return
DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(beginValue).asBool()
&& predicandValue.lessThanEqual(endValue).asBool()));
} else {
- context.result = NullDatum.get();
+ return NullDatum.get();
}
}
}
@@ -129,31 +124,23 @@ public class BetweenPredicateEval extends EvalNode {
}
@Override
- public void eval(BetweenContext context, Schema schema, Tuple param) {
- predicand.eval(context.predicandContext, schema, param);
- Datum predicandValue = predicand.terminate(context.predicandContext);
- begin.eval(context.beginContext, schema, param);
- Datum beginValue = begin.terminate(context.beginContext);
- end.eval(context.endContext, schema, param);
- Datum endValue = begin.terminate(context.endContext);
-
- if (!(predicandValue instanceof NullDatum || beginValue instanceof NullDatum || endValue instanceof NullDatum)) {
- context.result = DatumFactory.createBool( not ^
+ public Datum eval(Schema schema, Tuple param) {
+ Datum predicandValue = predicand.eval(schema, param);
+ Datum beginValue = begin.eval(schema, param);
+ Datum endValue = end.eval(schema, param);
+
+ if (!(predicandValue.isNull()|| beginValue.isNull() || endValue.isNull())) {
+ return DatumFactory.createBool( not ^
(predicandValue.greaterThanEqual(beginValue).asBool() && predicandValue.lessThanEqual(endValue).asBool()) ||
(predicandValue.lessThanEqual(beginValue).asBool() && predicandValue.greaterThanEqual(endValue).asBool())
);
} else {
- context.result = NullDatum.get();
+ return NullDatum.get();
}
}
}
@Override
- public EvalContext newContext() {
- return new BetweenContext();
- }
-
- @Override
public TajoDataTypes.DataType getValueType() {
return RES_TYPE;
}
@@ -169,14 +156,14 @@ public class BetweenPredicateEval extends EvalNode {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+ public Datum eval(Schema schema, Tuple tuple) {
if (checker == null) {
if (begin.getType() == EvalType.CONST && end.getType() == EvalType.CONST) {
- Datum beginValue = begin.terminate(null);
- Datum endValue = end.terminate(null);
+ Datum beginValue = ((ConstEval)begin).getValue();
+ Datum endValue = ((ConstEval)end).getValue();
if (symmetric || beginValue.compareTo(endValue) <= 0) {
- checker = new ConstantChecker(not, predicand, begin.terminate(null), end.terminate(null));
+ checker = new ConstantChecker(not, predicand, beginValue, endValue);
} else {
checker = new AsymmetricChecker(not, predicand, begin, end);
}
@@ -189,12 +176,7 @@ public class BetweenPredicateEval extends EvalNode {
}
}
- checker.eval((BetweenContext) ctx, schema, tuple);
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- return ((BetweenContext)ctx).result;
+ return checker.eval(schema, tuple);
}
@Override
@@ -207,19 +189,6 @@ public class BetweenPredicateEval extends EvalNode {
return false;
}
- private class BetweenContext implements EvalContext {
- private EvalContext predicandContext;
- private EvalContext beginContext;
- private EvalContext endContext;
- private Datum result;
-
- BetweenContext() {
- predicandContext = predicand.newContext();
- beginContext = begin.newContext();
- endContext = end.newContext();
- }
- }
-
@Deprecated
public void preOrder(EvalNodeVisitor visitor) {
visitor.visit(this);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
index ba21b4a..d362927 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
@@ -34,11 +34,6 @@ import static org.apache.tajo.common.TajoDataTypes.Type;
public class BinaryEval extends EvalNode implements Cloneable {
@Expose private DataType returnType = null;
- private class BinaryEvalCtx implements EvalContext {
- EvalContext left;
- EvalContext right;
- }
-
/**
* @param type
*/
@@ -75,15 +70,6 @@ public class BinaryEval extends EvalNode implements Cloneable {
this(expr.type, expr.leftExpr, expr.rightExpr);
}
- @Override
- public EvalContext newContext() {
- BinaryEvalCtx newCtx = new BinaryEvalCtx();
- newCtx.left = leftExpr.newContext();
- newCtx.right = rightExpr.newContext();
-
- return newCtx;
- }
-
/**
* This is verified by ExprsVerifier.checkArithmeticOperand().
*/
@@ -134,17 +120,9 @@ public class BinaryEval extends EvalNode implements Cloneable {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- BinaryEvalCtx binCtx = (BinaryEvalCtx) ctx;
- leftExpr.eval(binCtx == null ? null : binCtx.left, schema, tuple);
- rightExpr.eval(binCtx == null ? null : binCtx.right, schema, tuple);
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- BinaryEvalCtx binCtx = (BinaryEvalCtx) ctx;
- Datum lhs = leftExpr.terminate(binCtx.left);
- Datum rhs = rightExpr.terminate(binCtx.right);
+ public Datum eval(Schema schema, Tuple tuple) {
+ Datum lhs = leftExpr.eval(schema, tuple);
+ Datum rhs = rightExpr.eval(schema, tuple);
switch(type) {
case AND:
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
index 9bbf4b4..d08bfd3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.storage.Tuple;
@@ -62,11 +62,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
}
@Override
- public EvalContext newContext() {
- return new CaseContext(whens, elseResult != null ? elseResult.newContext() : null);
- }
-
- @Override
public DataType getValueType() {
return whens.get(0).getResultExpr().getValueType();
}
@@ -76,30 +71,18 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
return "?";
}
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- CaseContext caseCtx = (CaseContext) ctx;
+ public Datum eval(Schema schema, Tuple tuple) {
for (int i = 0; i < whens.size(); i++) {
- whens.get(i).eval(caseCtx.contexts[i], schema, tuple);
+ if (whens.get(i).checkIfCondition(schema, tuple)) {
+ return whens.get(i).eval(schema, tuple);
+ }
}
if (elseResult != null) { // without else clause
- elseResult.eval(caseCtx.elseCtx, schema, tuple);
+ return elseResult.eval(schema, tuple);
}
- }
- @Override
- public Datum terminate(EvalContext ctx) {
- CaseContext caseCtx = (CaseContext) ctx;
- for (int i = 0; i < whens.size(); i++) {
- if (whens.get(i).terminate(caseCtx.contexts[i]).asBool()) {
- return whens.get(i).getThenResult(caseCtx.contexts[i]);
- }
- }
- if (elseResult != null) { // without else clause
- return elseResult.terminate(caseCtx.elseCtx);
- } else {
- return DatumFactory.createNullDatum();
- }
+ return NullDatum.get();
}
@Override
@@ -163,11 +146,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
}
@Override
- public EvalContext newContext() {
- return new WhenContext(condition.newContext(), result.newContext());
- }
-
- @Override
public DataType getValueType() {
return CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
}
@@ -177,14 +155,12 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
return "when?";
}
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- condition.eval(((WhenContext) ctx).condCtx, schema, tuple);
- result.eval(((WhenContext) ctx).resultCtx, schema, tuple);
+ public boolean checkIfCondition(Schema schema, Tuple tuple) {
+ return condition.eval(schema, tuple).isTrue();
}
- @Override
- public Datum terminate(EvalContext ctx) {
- return condition.terminate(((WhenContext) ctx).condCtx);
+ public Datum eval(Schema schema, Tuple tuple) {
+ return result.eval(schema, tuple);
}
public EvalNode getConditionExpr() {
@@ -215,20 +191,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
return CoreGsonHelper.toJson(IfThenEval.this, IfThenEval.class);
}
- private class WhenContext implements EvalContext {
- EvalContext condCtx;
- EvalContext resultCtx;
-
- public WhenContext(EvalContext condCtx, EvalContext resultCtx) {
- this.condCtx = condCtx;
- this.resultCtx = resultCtx;
- }
- }
-
- public Datum getThenResult(EvalContext ctx) {
- return result.terminate(((WhenContext) ctx).resultCtx);
- }
-
@Override
public void preOrder(EvalNodeVisitor visitor) {
visitor.visit(this);
@@ -243,17 +205,4 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
visitor.visit(this);
}
}
-
- private class CaseContext implements EvalContext {
- EvalContext [] contexts;
- EvalContext elseCtx;
-
- CaseContext(List<IfThenEval> whens, EvalContext elseCtx) {
- contexts = new EvalContext[whens.size()];
- for (int i = 0; i < whens.size(); i++) {
- contexts[i] = whens.get(i).newContext();
- }
- this.elseCtx = elseCtx;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
index 033800d..9ff3df1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
@@ -41,13 +41,6 @@ public class CastEval extends EvalNode {
}
@Override
- public EvalContext newContext() {
- CastContext castContext = new CastContext();
- castContext.childCtx = operand.newContext();
- return castContext;
- }
-
- @Override
public DataType getValueType() {
return target;
}
@@ -57,15 +50,8 @@ public class CastEval extends EvalNode {
return target.getType().name();
}
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- CastContext castContext = (CastContext) ctx;
- operand.eval(castContext.childCtx , schema, tuple);
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- CastContext castContext = (CastContext) ctx;
- Datum operandDatum = operand.terminate(castContext.childCtx);
+ public Datum eval(Schema schema, Tuple tuple) {
+ Datum operandDatum = operand.eval(schema, tuple);
if (operandDatum.isNull()) {
return operandDatum;
}
@@ -97,8 +83,7 @@ public class CastEval extends EvalNode {
case BLOB:
return DatumFactory.createBlob(operandDatum.asByteArray());
default:
- throw new InvalidCastException("Cannot cast " + operand.getValueType().getType() + " to "
- + target.getType());
+ throw new InvalidCastException("Cannot cast " + operand.getValueType().getType() + " to " + target.getType());
}
}
@@ -127,8 +112,4 @@ public class CastEval extends EvalNode {
operand.postOrder(visitor);
visitor.visit(this);
}
-
- static class CastContext implements EvalContext {
- EvalContext childCtx;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
index 6d7f67f..2cb530d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
@@ -21,8 +21,10 @@ package org.apache.tajo.engine.eval;
import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
public class ConstEval extends EvalNode implements Comparable<ConstEval>, Cloneable {
@Expose Datum datum = null;
@@ -32,17 +34,6 @@ public class ConstEval extends EvalNode implements Comparable<ConstEval>, Clonea
this.datum = datum;
}
- @Override
- public EvalContext newContext() {
- return null;
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- return this.datum;
- }
-
-
public Datum getValue() {
return this.datum;
}
@@ -52,6 +43,11 @@ public class ConstEval extends EvalNode implements Comparable<ConstEval>, Clonea
}
@Override
+ public Datum eval(Schema schema, Tuple tuple) {
+ return datum;
+ }
+
+ @Override
public DataType getValueType() {
return CatalogUtil.newSimpleDataType(datum.type());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
deleted file mode 100644
index 00007ce..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.eval;
-
-public interface EvalContext {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
index 2e797fb..5578043 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
@@ -40,8 +40,6 @@ public abstract class EvalNode implements Cloneable, GsonObject {
this.leftExpr = left;
this.rightExpr = right;
}
-
- public abstract EvalContext newContext();
public EvalType getType() {
return this.type;
@@ -86,9 +84,7 @@ public abstract class EvalNode implements Cloneable, GsonObject {
return CoreGsonHelper.toJson(this, EvalNode.class);
}
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {}
-
- public abstract <T extends Datum> T terminate(EvalContext ctx);
+ public abstract <T extends Datum> T eval(Schema schema, Tuple tuple);
@Deprecated
public void preOrder(EvalNodeVisitor visitor) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 4044217..da05739 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.eval;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.tajo.catalog.Column;
@@ -277,10 +276,10 @@ public class EvalTreeUtil {
}
}
- public static List<AggregationFunctionCallEval> findDistinctAggFunction(EvalNode expr) {
+ public static Set<AggregationFunctionCallEval> findDistinctAggFunction(EvalNode expr) {
AllAggFunctionFinder finder = new AllAggFunctionFinder();
expr.postOrder(finder);
- return Lists.newArrayList(finder.getAggregationFunction());
+ return finder.getAggregationFunction();
}
public static class AllAggFunctionFinder implements EvalNodeVisitor {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
index dd29a3b..dc9b35b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
@@ -40,32 +40,14 @@ public class FieldEval extends EvalNode implements Cloneable {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+ public Datum eval(Schema schema, Tuple tuple) {
if (fieldId == -1) {
fieldId = schema.getColumnId(column.getQualifiedName());
if (fieldId == -1) {
throw new IllegalStateException("No Such Column Reference: " + column + ", schema: " + schema);
}
}
- FieldEvalContext fieldCtx = (FieldEvalContext) ctx;
- fieldCtx.datum = tuple.get(fieldId);
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- return ((FieldEvalContext)ctx).datum;
- }
-
- @Override
- public EvalContext newContext() {
- return new FieldEvalContext();
- }
-
- private static class FieldEvalContext implements EvalContext {
- private Datum datum;
-
- public FieldEvalContext() {
- }
+ return tuple.get(fieldId);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
index c4906d7..0555bde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
@@ -64,12 +64,6 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
public boolean isDistinct() {
return funcDesc.getFuncType() == DISTINCT_AGGREGATION || funcDesc.getFuncType() == DISTINCT_UDA;
}
-
- @Override
- public EvalContext newContext() {
- FuncCallCtx newCtx = new FuncCallCtx(argEvals);
- return newCtx;
- }
public EvalNode [] getArgs() {
return this.argEvals;
@@ -84,9 +78,7 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
}
@Override
- public abstract void eval(EvalContext ctx, Schema schema, Tuple tuple);
-
- public abstract Datum terminate(EvalContext ctx);
+ public abstract Datum eval(Schema schema, Tuple tuple);
@Override
public String getName() {
@@ -149,14 +141,4 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
}
visitor.visit(this);
}
-
- protected class FuncCallCtx implements EvalContext {
- EvalContext [] argCtxs;
- FuncCallCtx(EvalNode [] argEvals) {
- argCtxs = new EvalContext[argEvals.length];
- for (int i = 0; i < argEvals.length; i++) {
- argCtxs[i] = argEvals[i].newContext();
- }
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
index aeb68aa..9446d70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
@@ -22,17 +22,15 @@ import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.function.GeneralFunction;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.function.GeneralFunction;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.TUtil;
public class GeneralFunctionEval extends FunctionEval {
@Expose protected GeneralFunction instance;
- private Tuple tuple;
private Tuple params = null;
- private Schema schema;
public GeneralFunctionEval(FunctionDesc desc, GeneralFunction instance, EvalNode[] givenArgs) {
super(EvalType.FUNCTION, desc, givenArgs);
@@ -44,27 +42,19 @@ public class GeneralFunctionEval extends FunctionEval {
* @see nta.query.executor.eval.Expr#evalVal(Tuple)
*/
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- this.schema = schema;
- this.tuple = tuple;
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- FuncCallCtx localCtx = (FuncCallCtx) ctx;
+ public Datum eval(Schema schema, Tuple tuple) {
if (this.params == null) {
params = new VTuple(argEvals.length);
}
-
if(argEvals != null) {
params.clear();
for(int i=0;i < argEvals.length; i++) {
- argEvals[i].eval(localCtx.argCtxs[i], schema, tuple);
- params.put(i, argEvals[i].terminate(localCtx.argCtxs[i]));
+ params.put(i, argEvals[i].eval(schema, tuple));
}
}
+
return instance.eval(params);
- }
+ }
@Override
public boolean equals(Object obj) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e113326..189e9dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -25,7 +25,6 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.Tuple;
public class InEval extends BinaryEval {
@@ -45,11 +44,6 @@ public class InEval extends BinaryEval {
}
@Override
- public EvalContext newContext() {
- return new InEvalCtx();
- }
-
- @Override
public TajoDataTypes.DataType getValueType() {
return RES_TYPE;
}
@@ -60,37 +54,27 @@ public class InEval extends BinaryEval {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- InEvalCtx isNullCtx = (InEvalCtx) ctx;
+ public Datum eval(Schema schema, Tuple tuple) {
if (fieldId == null) {
fieldId = schema.getColumnId(((FieldEval)leftExpr).getColumnRef().getQualifiedName());
values = ((RowConstantEval)rightExpr).getValues();
}
- boolean isIncluded = false;
-
Datum value = tuple.get(fieldId);
if (value.isNull()) {
- isNullCtx.isNull = true;
- return;
+ return value;
}
+ boolean isIncluded = false;
for (Datum datum : values) {
if (value.equalsTo(datum).asBool()) {
isIncluded = true;
break;
}
}
- isNullCtx.result = isIncluded;
- }
- @Override
- public Datum terminate(EvalContext ctx) {
- if (((InEvalCtx)ctx).isNull) {
- return NullDatum.get();
- }
- return DatumFactory.createBool(not ^ ((InEvalCtx)ctx).result);
+ return DatumFactory.createBool(not ^ isIncluded);
}
@Override
@@ -105,9 +89,4 @@ public class InEval extends BinaryEval {
public String toString() {
return leftExpr + " IN (" + rightExpr + ")";
}
-
- private class InEvalCtx implements EvalContext {
- boolean isNull;
- boolean result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
index 8bb4d95..5704aa5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
@@ -23,7 +23,6 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.BooleanDatum;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.Tuple;
@@ -42,13 +41,6 @@ public class IsNullEval extends BinaryEval {
}
@Override
- public EvalContext newContext() {
- IsNullEvalCtx context = new IsNullEvalCtx();
- context.predicandContext = leftExpr.newContext();
- return context;
- }
-
- @Override
public DataType getValueType() {
return RES_TYPE;
}
@@ -64,16 +56,9 @@ public class IsNullEval extends BinaryEval {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- IsNullEvalCtx isNullCtx = (IsNullEvalCtx) ctx;
- leftExpr.eval(isNullCtx.predicandContext, schema, tuple);
- Datum result = leftExpr.terminate(((IsNullEvalCtx)ctx).predicandContext);
- ((IsNullEvalCtx) ctx).result = DatumFactory.createBool(isNot ^ (result.type() == TajoDataTypes.Type.NULL_TYPE));
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- return ((IsNullEvalCtx)ctx).result;
+ public Datum eval(Schema schema, Tuple tuple) {
+ boolean isNull = leftExpr.eval(schema, tuple).isNull();
+ return DatumFactory.createBool(isNot ^ isNull);
}
public boolean isNot() {
@@ -96,13 +81,4 @@ public class IsNullEval extends BinaryEval {
return isNullEval;
}
-
- private class IsNullEvalCtx implements EvalContext {
- EvalContext predicandContext;
- BooleanDatum result;
-
- IsNullEvalCtx() {
- this.result = DatumFactory.createBool(false);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
index 677708d..1a16af4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
@@ -36,13 +36,6 @@ public class NotEval extends EvalNode implements Cloneable {
this.childEval = childEval;
}
- @Override
- public EvalContext newContext() {
- NotEvalCtx newCtx = new NotEvalCtx();
- newCtx.childExprCtx = childEval.newContext();
- return newCtx;
- }
-
public EvalNode getChild() {
return childEval;
}
@@ -58,13 +51,8 @@ public class NotEval extends EvalNode implements Cloneable {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- childEval.eval(((NotEvalCtx) ctx).childExprCtx, schema, tuple);
- }
-
- @Override
- public Datum terminate(EvalContext ctx) {
- Datum datum = childEval.terminate(((NotEvalCtx) ctx).childExprCtx);
+ public Datum eval(Schema schema, Tuple tuple) {
+ Datum datum = childEval.eval(schema, tuple);
return !datum.isNull() ? DatumFactory.createBool(!datum.asBool()) : datum;
}
@@ -101,8 +89,4 @@ public class NotEval extends EvalNode implements Cloneable {
eval.childEval = (EvalNode) this.childEval.clone();
return eval;
}
-
- private class NotEvalCtx implements EvalContext {
- EvalContext childExprCtx;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
index 73a68cd..0f4411d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
@@ -34,11 +34,6 @@ public class PartialBinaryExpr extends EvalNode {
}
@Override
- public EvalContext newContext() {
- return null;
- }
-
- @Override
public DataType getValueType() {
return null;
}
@@ -49,20 +44,13 @@ public class PartialBinaryExpr extends EvalNode {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+ public Datum eval(Schema schema, Tuple tuple) {
throw new InvalidEvalException(
"ERROR: the partial binary expression cannot be evluated: "
+ this.toString() );
}
@Override
- public Datum terminate(EvalContext ctx) {
- throw new InvalidEvalException(
- "ERROR: the partial binary expression cannot be terminated: "
- + this.toString() );
- }
-
- @Override
public boolean equals(Object obj) {
if (obj instanceof PartialBinaryExpr) {
PartialBinaryExpr other = (PartialBinaryExpr) obj;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
index 568af0c..8d78b0b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
@@ -39,8 +39,6 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
@Expose protected boolean caseInsensitive;
// transient variables
- private EvalContext leftContext;
- private boolean isNullResult = false;
protected Pattern compiled;
public PatternMatchPredicateEval(EvalType evalType, boolean not, EvalNode predicand, ConstEval pattern,
@@ -68,29 +66,17 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- if (leftContext == null) {
- leftContext = leftExpr.newContext();
+ public Datum eval(Schema schema, Tuple tuple) {
+ if (this.compiled == null) {
compile(this.pattern);
}
- leftExpr.eval(leftContext, schema, tuple);
- Datum predicand = leftExpr.terminate(leftContext);
- isNullResult = predicand instanceof NullDatum;
- boolean matched = compiled.matcher(predicand.asChars()).matches();
- ((PatternMatchPredicateContext)ctx).result = matched ^ not;
- }
-
- public Datum terminate(EvalContext ctx) {
- return !isNullResult ?
- DatumFactory.createBool(((PatternMatchPredicateContext)ctx).result) : NullDatum.get();
- }
-
- public EvalContext newContext() {
- return new PatternMatchPredicateContext();
- }
+ Datum predicand = leftExpr.eval(schema, tuple);
+ if (predicand.isNull()) {
+ return NullDatum.get();
+ }
- private class PatternMatchPredicateContext implements EvalContext {
- public boolean result = false;
+ boolean matched = compiled.matcher(predicand.asChars()).matches();
+ return DatumFactory.createBool(matched ^ not);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
index 1fc6a21..646a627 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
@@ -20,7 +20,10 @@ package org.apache.tajo.engine.eval;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.TUtil;
import static org.apache.tajo.common.TajoDataTypes.DataType;
@@ -34,11 +37,6 @@ public class RowConstantEval extends EvalNode {
}
@Override
- public EvalContext newContext() {
- return null;
- }
-
- @Override
public DataType getValueType() {
return CatalogUtil.newSimpleDataType(values[0].type());
}
@@ -49,8 +47,12 @@ public class RowConstantEval extends EvalNode {
}
@Override
- public Datum terminate(EvalContext ctx) {
- return null;
+ public Datum eval(Schema schema, Tuple tuple) {
+ return NullDatum.get();
+ }
+
+ public Datum [] getValues() {
+ return values;
}
@Override
@@ -67,10 +69,6 @@ public class RowConstantEval extends EvalNode {
return TUtil.arrayToString(values);
}
- public Datum [] getValues() {
- return values;
- }
-
public void preOrder(EvalNodeVisitor visitor) {
visitor.visit(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
index b0ba813..e0f50c1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.eval;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NumericDatum;
import org.apache.tajo.storage.Tuple;
@@ -34,13 +35,6 @@ public class SignedEval extends EvalNode implements Cloneable {
this.childEval = childEval;
}
- @Override
- public EvalContext newContext() {
- SignedEvalCtx newCtx = new SignedEvalCtx();
- newCtx.childExprCtx = childEval.newContext();
- return newCtx;
- }
-
public boolean isNegative() {
return negative;
}
@@ -60,13 +54,8 @@ public class SignedEval extends EvalNode implements Cloneable {
}
@Override
- public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
- childEval.eval(((SignedEvalCtx) ctx).childExprCtx, schema, tuple);
- }
-
- @Override
- public NumericDatum terminate(EvalContext ctx) {
- NumericDatum result = childEval.terminate(((SignedEvalCtx) ctx).childExprCtx);
+ public Datum eval(Schema schema, Tuple tuple) {
+ NumericDatum result = childEval.eval(schema, tuple);
if (negative) {
result.inverseSign();
}
@@ -107,8 +96,4 @@ public class SignedEval extends EvalNode implements Cloneable {
eval.childEval = (EvalNode) this.childEval.clone();
return eval;
}
-
- private class SignedEvalCtx implements EvalContext {
- EvalContext childExprCtx;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
index a665730..c473ac1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
@@ -46,6 +46,10 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
return true;
}
+ if (checkDateTime(dataType1) && checkDateTime(dataType2)) {
+ return true;
+ }
+
return false;
}
@@ -109,8 +113,7 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
private static void checkArithmeticOperand(VerificationState state, BinaryEval evalNode) {
EvalNode leftExpr = evalNode.getLeftExpr();
EvalNode rightExpr = evalNode.getRightExpr();
- if (!(checkNumericType(leftExpr.getValueType())
- && checkNumericType(rightExpr.getValueType()))) {
+ if (!(checkNumericType(leftExpr.getValueType()) && checkNumericType(rightExpr.getValueType()))) {
state.addVerification("No operator matches the given name and argument type(s): " + evalNode.toString());
}
}
@@ -125,6 +128,12 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
return Type.CHAR.getNumber() < typeNumber && typeNumber <= Type.TEXT.getNumber();
}
+ private static boolean checkDateTime(DataType dataType) {
+ int typeNumber = dataType.getType().getNumber();
+ return (Type.DATE.getNumber() < typeNumber && typeNumber <= Type.INTERVAL.getNumber()) ||
+ (Type.TIMEZ.getNumber() < typeNumber && typeNumber <= Type.TIMESTAMPZ.getNumber());
+ }
+
@Override
public EvalNode visitPlus(VerificationState state, BinaryEval evalNode, Stack<EvalNode> stack) {
super.visitDivide(state, evalNode, stack);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index ac76f57..9c58b7d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -64,13 +64,6 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
public LogicalNode visitGroupBy(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
visit(state, plan, block, node.getChild(), stack);
-
- if (node.hasTargets()) {
- for (Target target : node.getTargets()) {
- ExprsVerifier.verify(state, target.getEvalTree());
- }
- }
-
return node;
}
@@ -172,10 +165,6 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = visit(state, plan, block, node.getChild(), stack);
-// if (node.hasTargetSchema()) {
-// ensureDomains(state, node.getTargetSchema(), node.getChild().getOutSchema());
-// }
-
return child;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 8995226..066a3bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -19,7 +19,6 @@
package org.apache.tajo.engine.planner;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,16 +35,15 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.exception.VerifyException;
import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.util.TUtil;
import java.util.*;
-import static org.apache.tajo.algebra.Aggregation.GroupType;
import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
import static org.apache.tajo.algebra.CreateTable.PartitionType;
import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
@@ -225,6 +223,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// It sets raw targets, all of them are raw expressions instead of references.
setRawTargets(context, targets, referenceNames, projection);
+ verifyProjectedFields(block, projectionNode);
return projectionNode;
}
@@ -333,6 +332,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
private Target [] buildTargets(LogicalPlan plan, QueryBlock block, String[] referenceNames)
throws PlanningException {
Target [] targets = new Target[referenceNames.length];
+
for (int i = 0; i < referenceNames.length; i++) {
if (block.namedExprsMgr.isResolved(referenceNames[i])) {
targets[i] = block.namedExprsMgr.getTarget(referenceNames[i]);
@@ -346,6 +346,44 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return targets;
}
+ private static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
+ if (projectable instanceof ProjectionNode && block.hasNode(NodeType.GROUP_BY)) {
+ for (Target target : projectable.getTargets()) {
+ Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+ for (Column c : columns) {
+ if (!projectable.getInSchema().contains(c)) {
+ throw new PlanningException(c.getQualifiedName()
+ + " must appear in the GROUP BY clause or be used in an aggregate function");
+ }
+ }
+ }
+ } else if (projectable instanceof GroupbyNode) {
+ GroupbyNode groupbyNode = (GroupbyNode) projectable;
+ for (Column c : groupbyNode.getGroupingColumns()) {
+ if (!projectable.getInSchema().contains(c)) {
+ throw new PlanningException("Cannot get such a field: " + c);
+ }
+ }
+ for (AggregationFunctionCallEval f : groupbyNode.getAggFunctions()) {
+ Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(f);
+ for (Column c : columns) {
+ if (!projectable.getInSchema().contains(c)) {
+ throw new PlanningException("Cannot get such a field: " + c);
+ }
+ }
+ }
+ } else {
+ for (Target target : projectable.getTargets()) {
+ Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+ for (Column c : columns) {
+ if (!projectable.getInSchema().contains(c)) {
+ throw new PlanningException("Cannot get such a field: " + c);
+ }
+ }
+ }
+ }
+ }
+
/**
* Insert a group-by operator before a sort or a projection operator.
* It is used only when a group-by clause is not given.
@@ -358,22 +396,26 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
GroupbyNode groupbyNode = new GroupbyNode(plan.newPID());
groupbyNode.setGroupingColumns(new Column[] {});
- Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+ Set<String> aggEvalNames = new LinkedHashSet<String>();
+ Set<AggregationFunctionCallEval> aggEvals = new LinkedHashSet<AggregationFunctionCallEval>();
boolean includeDistinctFunction = false;
for (Iterator<NamedExpr> it = block.namedExprsMgr.getUnresolvedExprs(); it.hasNext();) {
NamedExpr rawTarget = it.next();
try {
includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+ if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+ aggEvalNames.add(rawTarget.getAlias());
+ aggEvals.add((AggregationFunctionCallEval) evalNode);
block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
- evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
}
} catch (VerifyException ve) {
}
}
groupbyNode.setDistinct(includeDistinctFunction);
- groupbyNode.setTargets(evaluatedTargets.toArray(new Target[evaluatedTargets.size()]));
+ groupbyNode.setAggFunctions(aggEvals.toArray(new AggregationFunctionCallEval[aggEvals.size()]));
+ Target [] targets = ProjectionPushDownRule.buildGroupByTarget(groupbyNode, aggEvalNames.toArray(new String[aggEvalNames.size()]));
+ groupbyNode.setTargets(targets);
groupbyNode.setChild(child);
groupbyNode.setInSchema(child.getOutSchema());
@@ -429,7 +471,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
limitNode.setInSchema(child.getOutSchema());
limitNode.setOutSchema(child.getOutSchema());
- limitNode.setFetchFirst(firstFetNum.terminate(null).asInt8());
+ limitNode.setFetchFirst(firstFetNum.eval(null, null).asInt8());
return limitNode;
}
@@ -558,53 +600,64 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
groupingNode.setChild(child);
groupingNode.setInSchema(child.getOutSchema());
+ // Set grouping sets
+ Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
+ for (int i = 0; i < groupingColumns.length; i++) {
+ if (block.namedExprsMgr.isResolved(groupingKeyRefNames[i])) {
+ groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
+ } else {
+ throw new PlanningException("Each grouping column expression must be a scalar expression.");
+ }
+ }
+ groupingNode.setGroupingColumns(groupingColumns);
+
+ ////////////////////////////////////////////////////////
+ // Visit and Build Child Plan
+ ////////////////////////////////////////////////////////
// create EvalNodes and check if each EvalNode can be evaluated here.
- Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+ List<String> aggEvalNames = TUtil.newList();
+ List<AggregationFunctionCallEval> aggEvalNodes = TUtil.newList();
boolean includeDistinctFunction = false;
- for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+ for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getUnresolvedExprs(); iterator.hasNext();) {
+ NamedExpr namedExpr = iterator.next();
try {
- includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
- EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
- block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
- evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
+ includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(namedExpr.getExpr());
+ EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, namedExpr.getExpr());
+ if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+ block.namedExprsMgr.resolveExpr(namedExpr.getAlias(), evalNode);
+ aggEvalNames.add(namedExpr.getAlias());
+ aggEvalNodes.add((AggregationFunctionCallEval) evalNode);
}
} catch (VerifyException ve) {
}
}
- // If there is at least one distinct aggregation function
+ // if there is at least one distinct aggregation function
groupingNode.setDistinct(includeDistinctFunction);
+ groupingNode.setAggFunctions(aggEvalNodes.toArray(new AggregationFunctionCallEval[aggEvalNodes.size()]));
+ Target [] targets = new Target[groupingKeyNum + aggEvalNames.size()];
- // Set grouping sets
- List<Target> targets = new ArrayList<Target>();
- Aggregation.GroupElement [] groupElements = aggregation.getGroupSet();
-
- // Currently, single ordinary grouping set is only available.
- if (groupElements[0].getType() == GroupType.OrdinaryGroup) {
- Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
- for (int i = 0; i < groupingColumns.length; i++) {
- if (block.namedExprsMgr.isResolved(groupingKeyRefNames[i])) {
- groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
- } else {
- throw new PlanningException("Each grouping column expression must be a scalar expression.");
- }
- }
+ // In target, grouping columns will be followed by aggregation evals.
+ //
+ // col1, col2, col3, sum(..), agv(..)
+ // ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
+ // grouping keys aggregation evals
- for (Column column : groupingColumns) {
- if (child.getOutSchema().contains(column)) {
- targets.add(new Target(new FieldEval(child.getOutSchema().getColumn(column))));
- }
- }
- groupingNode.setGroupingColumns(groupingColumns);
- } else {
- throw new InvalidQueryException("Not support grouping");
+ // Build grouping keys
+ for (int i = 0; i < groupingKeyNum; i++) {
+ Target target = new Target(new FieldEval(groupingNode.getGroupingColumns()[i]));
+ targets[i] = target;
}
- targets.addAll(evaluatedTargets);
- groupingNode.setTargets(targets.toArray(new Target[targets.size()]));
+ for (int i = 0, targetIdx = groupingKeyNum; i < aggEvalNodes.size(); i++, targetIdx++) {
+ targets[targetIdx] = block.namedExprsMgr.getTarget(aggEvalNames.get(i));
+ }
+
+ groupingNode.setTargets(targets);
block.unsetAggregationRequire();
+
+ verifyProjectedFields(block, groupingNode);
return groupingNode;
}
@@ -892,6 +945,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
scanNode.setTargets(targets.toArray(new Target[targets.size()]));
+ verifyProjectedFields(block, scanNode);
return scanNode;
}
@@ -1487,25 +1541,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
===============================================================================================*/
public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode groupbyNode) {
- Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
-
- if (!groupbyNode.getInSchema().containsAll(columnRefs)) {
- return false;
- }
-
- Set<String> tableIds = Sets.newHashSet();
- // getting distinct table references
- for (Column col : columnRefs) {
- if (!tableIds.contains(col.getQualifier())) {
- tableIds.add(col.getQualifier());
- }
- }
-
- if (tableIds.size() > 1) {
- return false;
- }
-
- return true;
+ return checkIfBeEvaluateAtThis(evalNode, groupbyNode) && evalNode.getType() == EvalType.AGG_FUNCTION;
}
public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode joinNode,
@@ -1524,11 +1560,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// at the topmost join operator.
// TODO - It's also valid that case-when is evalauted at the topmost outer operator.
// But, how can we know there is no further outer join operator after this node?
- if (checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin)) {
- return true;
- } else {
- return false;
- }
+ return checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin);
}
private static boolean checkCaseWhenWithOuterJoin(QueryBlock block, EvalNode evalNode, boolean isTopMostJoin) {
@@ -1564,9 +1596,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public static boolean checkIfBeEvaluateAtThis(EvalNode evalNode, LogicalNode node) {
Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
- if (!node.getInSchema().containsAll(columnRefs)) {
- return false;
- }
- return true;
+ return node.getInSchema().containsAll(columnRefs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index ae5f4fc..663954e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -20,9 +20,11 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.CountRowsFunctionExpr;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.GeneralSetFunctionExpr;
+import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -30,8 +32,8 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.TUtil;
@@ -209,94 +211,6 @@ public class PlannerUtil {
parentNode.setChild(newNode);
}
- public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
- Preconditions.checkNotNull(groupBy);
-
- GroupbyNode child = null;
-
- // cloning groupby node
- try {
- child = (GroupbyNode) groupBy.clone();
- } catch (CloneNotSupportedException e) {
- e.printStackTrace();
- }
-
- List<Target> firstStepTargets = Lists.newArrayList();
- Target[] secondTargets = groupBy.getTargets();
- Target[] firstTargets = child.getTargets();
-
- Target second;
- Target first;
- int targetId = 0;
- for (int i = 0; i < firstTargets.length; i++) {
- second = secondTargets[i];
- first = firstTargets[i];
-
- List<AggregationFunctionCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
- List<AggregationFunctionCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
-
- if (firstStepFunctions.size() == 0) {
- firstStepTargets.add(first);
- targetId++;
- } else {
- for (AggregationFunctionCallEval func : firstStepFunctions) {
- Target newTarget;
-
- if (func.isDistinct()) {
- List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
- newTarget = new Target(new FieldEval(fields.get(0)));
- String targetName = "column_" + (targetId++);
- newTarget.setAlias(targetName);
-
- AggregationFunctionCallEval secondFunc = null;
- for (AggregationFunctionCallEval sf : secondStepFunctions) {
- if (func.equals(sf)) {
- secondFunc = sf;
- break;
- }
- }
-
- secondFunc.setArgs(new EvalNode [] {new FieldEval(
- new Column(targetName, newTarget.getEvalTree().getValueType()))});
- } else {
- func.setFirstPhase();
- String targetName = "column_" + (targetId++);
- newTarget = new Target(func, targetName);
- AggregationFunctionCallEval secondFunc = null;
- for (AggregationFunctionCallEval sf : secondStepFunctions) {
- if (func.equals(sf)) {
- secondFunc = sf;
- break;
- }
- }
- secondFunc.setArgs(new EvalNode [] {new FieldEval(
- new Column(targetName, newTarget.getEvalTree().getValueType()))});
- }
- firstStepTargets.add(newTarget);
- }
- }
-
- // Getting new target list and updating input/output schema from the new target list.
- Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
- Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
- List<Target> newTarget = Lists.newArrayList();
- for (Column column : groupBy.getGroupingColumns()) {
- if (!targetSchema.containsByQualifiedName(column.getQualifiedName())) {
- newTarget.add(new Target(new FieldEval(column)));
- }
- }
- targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
-
- child.setTargets(targetArray);
- // set the groupby chaining
- groupBy.setChild(child);
- groupBy.setInSchema(child.getOutSchema());
-
- }
- return child;
- }
-
-
/**
* Find the top logical node matched to type from the given node
*
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 77e2a0b..1843b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -18,17 +18,68 @@
package org.apache.tajo.engine.planner;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.Insert;
-import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.algebra.Projection;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
import java.util.Stack;
public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationState, Expr> {
+ private CatalogService catalog;
- public Expr visitInsert(VerificationState ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
- Expr child = super.visitInsert(ctx, stack, expr);
+ public PreLogicalPlanVerifier(CatalogService catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public Expr visitGroupBy(VerificationState ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+ Expr child = super.visitGroupBy(ctx, stack, expr);
+
+ // Enforcer only ordinary grouping set.
+ for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
+ if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
+ ctx.addVerification(groupingElement.getType() + " is not supported yet");
+ }
+ }
+
+ Projection projection = null;
+ for (Expr parent : stack) {
+ if (parent.getType() == OpType.Projection) {
+ projection = (Projection) parent;
+ break;
+ }
+ }
+
+ if (projection == null) {
+ throw new PlanningException("No Projection");
+ }
+
+ return expr;
+ }
+
+ @Override
+ public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
+ checkRelationExistence(state, expr.getName());
+ return expr;
+ }
+
+ private boolean checkRelationExistence(VerificationState state, String name) {
+ if (!catalog.existsTable(name)) {
+ state.addVerification(String.format("relation \"%s\" does not exist", name));
+ return false;
+ }
+ return true;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Insert or Update Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public Expr visitInsert(VerificationState context, Stack<Expr> stack, Insert expr) throws PlanningException {
+ Expr child = super.visitInsert(context, stack, expr);
+
+ if (expr.hasTableName()) {
+ checkRelationExistence(context, expr.getTableName());
+ }
if (child != null && child.getType() == OpType.Projection) {
if (expr.hasTargetColumns()) {
@@ -37,13 +88,13 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
int targetColumnNum = expr.getTargetColumns().length;
if (targetColumnNum > projectColumnNum) {
- ctx.addVerification("INSERT has more target columns than expressions");
+ context.addVerification("INSERT has more target columns than expressions");
} else if (targetColumnNum < projectColumnNum) {
- ctx.addVerification("INSERT has more expressions than target columns");
+ context.addVerification("INSERT has more expressions than target columns");
}
}
}
- return child;
+ return expr;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
index 8d6db22..161d39b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -19,7 +19,6 @@
package org.apache.tajo.engine.planner;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.storage.Tuple;
@@ -42,26 +41,11 @@ public class Projector {
}
}
- public void eval(EvalContext[] evalContexts, Tuple in) {
+ public void eval(Tuple in, Tuple out) {
if (targetNum > 0) {
for (int i = 0; i < evals.length; i++) {
- evals[i].eval(evalContexts[i], inSchema, in);
+ out.put(i, evals[i].eval(inSchema, in));
}
}
}
-
- public void terminate(EvalContext [] evalContexts, Tuple out) {
- for (int i = 0; i < targetNum; i++) {
- out.put(i, evals[i].terminate(evalContexts[i]));
- }
- }
-
- public EvalContext [] newContexts() {
- EvalContext [] evalContexts = new EvalContext[targetNum];
- for (int i = 0; i < targetNum; i++) {
- evalContexts[i] = evals[i].newContext();
- }
-
- return evalContexts;
- }
}