You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/08/23 19:38:12 UTC
[23/25] TAJO-906: Runtime code generation for evaluating expression
trees.
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
index 9ac0e62..a690759 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
@@ -24,6 +24,11 @@ import java.util.regex.PatternSyntaxException;
public class SimilarToPredicateEval extends PatternMatchPredicateEval {
private static final String SIMILARTO_ESCAPE_SPATIAL_CHARACTERS = "([.])";
+ public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern,
+ @SuppressWarnings("unused") boolean isCaseSensitive) {
+ super(EvalType.SIMILAR_TO, not, field, pattern, false);
+ }
+
public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern) {
super(EvalType.SIMILAR_TO, not, field, pattern);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
index 15e34de..15b628b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
@@ -60,9 +60,6 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
case IF_THEN:
result = visitIfThen(context, (CaseWhenEval.IfThenEval) evalNode, stack);
break;
- case IN:
- result = visitInPredicate(context, (InEval) evalNode, stack);
- break;
// Functions
case FUNCTION:
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
index 8c714c5..05640c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
@@ -32,12 +32,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, true);\n"
+ "true",
returnType = Type.BOOLEAN,
- paramTypes = {@ParamTypes(paramTypes = {Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})}
)
public class CoalesceBoolean extends Coalesce {
public CoalesceBoolean() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.BOOLEAN),
+ new Column("param", TajoDataTypes.Type.BOOLEAN),
new Column("params", TajoDataTypes.Type.BOOLEAN_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
index 23f8f0c..35df518 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, date '2014-01-01');\n"
+ "2014-01-01",
returnType = Type.DATE,
- paramTypes = {@ParamTypes(paramTypes = {Type.DATE, Type.DATE_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {Type.DATE_ARRAY})}
)
public class CoalesceDate extends Coalesce {
public CoalesceDate() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.DATE),
new Column("params", TajoDataTypes.Type.DATE_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
index 3e94150..47c363f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
@@ -31,12 +31,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, 10.0);\n"
+ "10.0",
returnType = TajoDataTypes.Type.FLOAT8,
- paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8, TajoDataTypes.Type.FLOAT8_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8_ARRAY})}
)
public class CoalesceDouble extends Coalesce {
public CoalesceDouble() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.FLOAT8),
new Column("params", TajoDataTypes.Type.FLOAT8_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
index 5d55255..f975615 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
@@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, 10);\n"
+ "10",
returnType = TajoDataTypes.Type.INT8,
- paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.INT8_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8_ARRAY})}
)
public class CoalesceLong extends Coalesce {
public CoalesceLong() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.INT8),
new Column("params", TajoDataTypes.Type.INT8_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
index 50e4786..6441e00 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
@@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, 'default');\n"
+ "default",
returnType = TajoDataTypes.Type.TEXT,
- paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT, TajoDataTypes.Type.TEXT_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT_ARRAY})}
)
public class CoalesceString extends Coalesce {
public CoalesceString() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.TEXT),
new Column("params", TajoDataTypes.Type.TEXT_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
index 01bb6de..56cfe32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, time '12:10:00');\n"
+ "12:10:00",
returnType = Type.TIME,
- paramTypes = {@ParamTypes(paramTypes = {Type.TIME, Type.TIME_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {Type.TIME_ARRAY})}
)
public class CoalesceTime extends Coalesce {
public CoalesceTime() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.TIME),
new Column("params", TajoDataTypes.Type.TIME_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
index 2609717..ec02e46 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
example = "> SELECT coalesce(null, null, timestamp '2014-01-01');\n"
+ "2014-01-01 00:00:00",
returnType = Type.TIMESTAMP,
- paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP, Type.TIMESTAMP_ARRAY})}
+ paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP_ARRAY})}
)
public class CoalesceTimestamp extends Coalesce {
public CoalesceTimestamp() {
super(new Column[] {
- new Column("column", TajoDataTypes.Type.TIMESTAMP),
new Column("params", TajoDataTypes.Type.TIMESTAMP_ARRAY),
});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index 6a3af98..574e32c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -439,11 +439,18 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
@Override
public EvalNode visitConcatenate(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
stack.push(expr);
- EvalNode left = visit(ctx, stack, expr.getLeft());
- EvalNode right = visit(ctx, stack, expr.getRight());
+ EvalNode lhs = visit(ctx, stack, expr.getLeft());
+ EvalNode rhs = visit(ctx, stack, expr.getRight());
stack.pop();
- return new BinaryEval(EvalType.CONCATENATE, left, right);
+ if (lhs.getValueType().getType() != Type.TEXT) {
+ lhs = convertType(lhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+ }
+ if (rhs.getValueType().getType() != Type.TEXT) {
+ rhs = convertType(rhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+ }
+
+ return new BinaryEval(EvalType.CONCATENATE, lhs, rhs);
}
private EvalNode visitPatternMatchPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
@@ -599,7 +606,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
// trying the implicit type conversion between actual parameter types and the definition types.
if (CatalogUtil.checkIfVariableLengthParamDefinition(TUtil.newList(funcDesc.getParamTypes()))) {
- DataType lastDataType = null;
+ DataType lastDataType = funcDesc.getParamTypes()[0];
for (int i = 0; i < givenArgs.length; i++) {
if (i < (funcDesc.getParamTypes().length - 1)) { // variable length
lastDataType = funcDesc.getParamTypes()[i];
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index e34548c..2730202 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -55,6 +55,7 @@ import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Stack;
@@ -147,6 +148,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
stack.push(selNode);
leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
stack.pop();
+
return new SelectionExec(ctx, selNode, leftExec);
case PROJECTION:
@@ -154,6 +156,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
stack.push(prjNode);
leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
stack.pop();
+
return new ProjectionExec(ctx, prjNode, leftExec);
case TABLE_SUBQUERY: {
@@ -210,6 +213,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
stack.pop();
+
return createJoinPlan(ctx, joinNode, leftExec, rightExec);
case UNION:
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index 161d39b..d8499d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -18,34 +18,49 @@
package org.apache.tajo.engine.planner;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
public class Projector {
+ private final TaskAttemptContext context;
private final Schema inSchema;
+ private final Target [] targets;
// for projection
private final int targetNum;
private final EvalNode[] evals;
- public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+ public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
+ this.context = context;
this.inSchema = inSchema;
if (targets == null) {
- targets = PlannerUtil.schemaToTargets(outSchema);
+ this.targets = PlannerUtil.schemaToTargets(outSchema);
+ } else {
+ this.targets = targets;
}
- this.targetNum = targets.length;
+
+ this.targetNum = this.targets.length;
evals = new EvalNode[targetNum];
- for (int i = 0; i < targetNum; i++) {
- evals[i] = targets[i].getEvalTree();
+
+ if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+ EvalNode eval;
+ for (int i = 0; i < targetNum; i++) {
+ eval = this.targets[i].getEvalTree();
+ evals[i] = context.getPrecompiledEval(inSchema, eval);
+ }
+ } else {
+ for (int i = 0; i < targetNum; i++) {
+ evals[i] = this.targets[i].getEvalTree();
+ }
}
}
public void eval(Tuple in, Tuple out) {
- if (targetNum > 0) {
- for (int i = 0; i < evals.length; i++) {
- out.put(i, evals[i].eval(inSchema, in));
- }
+ for (int i = 0; i < evals.length; i++) {
+ out.put(i, evals[i].eval(inSchema, in));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
index 6c45868..aa6d597 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
@@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlanString;
-public class HavingNode extends UnaryNode implements Cloneable {
+public class HavingNode extends UnaryNode implements SelectableNode, Cloneable {
@Expose private EvalNode qual;
public HavingNode(int pid) {
super(pid, NodeType.HAVING);
}
- public EvalNode getQual() {
- return this.qual;
- }
+ @Override
+ public boolean hasQual() {
+ return true;
+ }
- public void setQual(EvalNode qual) {
+ public void setQual(EvalNode qual) {
this.qual = qual;
}
+ public EvalNode getQual() {
+ return this.qual;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof HavingNode) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 9582dee..8d28e6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -30,7 +30,7 @@ import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.util.TUtil;
-public class ScanNode extends RelationNode implements Projectable, Cloneable {
+public class ScanNode extends RelationNode implements Projectable, SelectableNode, Cloneable {
@Expose protected TableDesc tableDesc;
@Expose protected String alias;
@Expose protected Schema logicalSchema;
@@ -101,6 +101,7 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable {
}
}
+ @Override
public Schema getTableSchema() {
return logicalSchema;
}
@@ -108,15 +109,18 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable {
public Schema getPhysicalSchema() {
return getInSchema();
}
-
+
+ @Override
public boolean hasQual() {
return qual != null;
}
-
+
+ @Override
public EvalNode getQual() {
return this.qual;
}
-
+
+ @Override
public void setQual(EvalNode evalTree) {
this.qual = evalTree;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
new file mode 100644
index 0000000..7082f4b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+
+/**
+ * An interface for logical node which is able to filter tuples.
+ */
+public interface SelectableNode {
+
+ /**
+ * Checking if it has filter condition
+ *
+ * @return True if it has filter condition. Otherwise, it will return false.
+ */
+ public boolean hasQual();
+
+ /**
+ * Set a filter condition.
+ *
+ * @param eval EvalNode resulting in a boolean result.
+ */
+ public void setQual(EvalNode eval);
+
+ /**
+ * Get a filter condition
+ *
+ * @return Filter Condition
+ */
+ public EvalNode getQual();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index b8a9680..3bbbd82 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlanString;
-public class SelectionNode extends UnaryNode implements Cloneable {
+public class SelectionNode extends UnaryNode implements SelectableNode, Cloneable {
@Expose private EvalNode qual;
public SelectionNode(int pid) {
super(pid, NodeType.SELECTION);
}
- public EvalNode getQual() {
- return this.qual;
- }
+ @Override
+ public boolean hasQual() {
+ return true;
+ }
- public void setQual(EvalNode qual) {
+ public void setQual(EvalNode qual) {
this.qual = qual;
}
+ public EvalNode getQual() {
+ return this.qual;
+ }
+
@Override
public PlanString getPlanString() {
PlanString planStr = new PlanString(this);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 60a7c19..91cefa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -79,13 +79,20 @@ public class BNLJoinExec extends BinaryPhysicalExec {
plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
}
- projector = new Projector(inSchema, outSchema, plan.getTargets());
+ projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
outputTuple = new VTuple(outSchema.size());
}
+ @Override
+ protected void compile() {
+ if (hasJoinQual) {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+ }
+
public JoinNode getPlan() {
return plan;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 35de707..f831525 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -59,7 +59,7 @@ public class BSTIndexScanExec extends PhysicalExec {
this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
this.fileScanner.init();
- this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
this.reader = new BSTIndex(sm.getFileSystem().getConf()).
getIndexReader(fileName, keySchema, comparator);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index f6f3e52..42611b0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -18,6 +18,8 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
+
import java.util.Stack;
public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index 628c18c..03ec396 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -52,6 +52,8 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
leftChild.init();
rightChild.init();
progress = 0.0f;
+
+ super.init();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 6215527..700e34d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.Scanner;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 65ebe2f..9dabbb3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
@@ -91,7 +92,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
}
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
@@ -102,6 +103,11 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
rightNumCols = inner.getSchema().size();
}
+ @Override
+ protected void compile() throws CompilationError {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
for (int i = 0; i < leftKeyList.length; i++) {
keyTuple.put(i, outerTuple.get(leftKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index a5e9df0..426a7a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -82,7 +82,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
}
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
@@ -90,6 +90,11 @@ public class HashJoinExec extends BinaryPhysicalExec {
leftKeyTuple = new VTuple(leftKeyList.length);
}
+ @Override
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
for (int i = 0; i < leftKeyList.length; i++) {
keyTuple.put(i, outerTuple.get(leftKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index ac8b28f..b752db5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -109,7 +109,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
}
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
@@ -119,6 +119,11 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
rightNumCols = rightChild.getSchema().size();
}
+ @Override
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
for (int i = 0; i < leftKeyList.length; i++) {
keyTuple.put(i, outerTuple.get(leftKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 4fbb5e4..4fdd03a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -45,6 +45,10 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
}
}
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
/**
* The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
* next() method finds the first unmatched tuple from both tables.
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index ff1f7b3..e1cc6a8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -88,7 +88,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
@@ -98,6 +98,11 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
rightNumCols = rightChild.getSchema().size();
}
+ @Override
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
public JoinNode getPlan(){
return this.joinNode;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 470e1c9..bbfe973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -84,13 +84,18 @@ public class MergeJoinExec extends BinaryPhysicalExec {
this.innerIterator = innerTupleSlots.iterator();
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
outTuple = new VTuple(outSchema.size());
}
+ @Override
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
public JoinNode getPlan(){
return this.joinNode;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 6e5900e..dc061ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -54,7 +54,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
}
// for projection
- projector = new Projector(inSchema, outSchema, plan.getTargets());
+ projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
needNewOuter = true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 5c17c40..37ef7df 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -57,7 +57,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
}
// for projection
- projector = new Projector(inSchema, outSchema, plan.getTargets());
+ projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
needNextRightTuple = true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 9fa5b76..d87a30d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -62,8 +62,9 @@ public class PartitionMergeScanExec extends PhysicalExec {
public void init() throws IOException {
for (CatalogProtos.FragmentProto fragment : fragments) {
- scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
- new CatalogProtos.FragmentProto[] {fragment}));
+ SeqScanExec scanExec = new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+ new CatalogProtos.FragmentProto[] {fragment});
+ scanners.add(scanExec);
}
progress = 0.0f;
rescan();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index e30a10b..31cfc4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -20,9 +20,11 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaObject;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -47,7 +49,14 @@ public abstract class PhysicalExec implements SchemaObject {
return outSchema;
}
- public abstract void init() throws IOException;
+ public void init() throws IOException {
+ if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+ this.compile();
+ }
+ }
+
+ protected void compile() throws CompilationError {
+ }
public abstract Tuple next() throws IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index 738db62..505b599 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -18,6 +18,8 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
+
import java.util.Stack;
public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index 0909c76..2f55cf7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
import org.apache.tajo.engine.query.QueryContext;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
deleted file mode 100644
index 62add1e..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import java.io.IOException;
-
-public class PhysicalPlanningException extends IOException {
- public PhysicalPlanningException(String message) {
- super(message);
- }
-
- public PhysicalPlanningException(Exception ioe) {
- super(ioe);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index ee6ef1d..89cd75a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -46,7 +46,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
super.init();
this.outTuple = new VTuple(outSchema.size());
- this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets());
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 365fc22..5d4dad5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -87,7 +87,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
plan.getJoinQual(), outer.getSchema(), inner.getSchema());
// for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
frameTuple = new FrameTuple();
@@ -96,6 +96,11 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
leftNumCols = outer.getSchema().size();
}
+ @Override
+ protected void compile() {
+ joinQual = context.getPrecompiledEval(inSchema, joinQual);
+ }
+
public JoinNode getPlan() {
return this.joinNode;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 2e676e9..5ae9a8f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.logical.SelectionNode;
import org.apache.tajo.storage.Tuple;
@@ -26,7 +27,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class SelectionExec extends UnaryPhysicalExec {
- private final EvalNode qual;
+ private EvalNode qual;
public SelectionExec(TaskAttemptContext context,
SelectionNode plan,
@@ -36,6 +37,11 @@ public class SelectionExec extends UnaryPhysicalExec {
}
@Override
+ public void compile() throws CompilationError {
+ qual = context.getPrecompiledEval(inSchema, qual);
+ }
+
+ @Override
public Tuple next() throws IOException {
Tuple tuple;
while ((tuple = child.next()) != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 507cb6c..2f0c12f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.engine.eval.ConstEval;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -66,8 +67,8 @@ public class SeqScanExec extends PhysicalExec {
private boolean cacheRead = false;
- public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
- ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
+ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
+ CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
this.plan = plan;
@@ -87,6 +88,12 @@ public class SeqScanExec extends PhysicalExec {
cacheKey = new TupleCacheKey(
context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
}
+
+ if (fragments != null
+ && plan.getTableDesc().hasPartition()
+ && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
}
/**
@@ -120,12 +127,16 @@ public class SeqScanExec extends PhysicalExec {
Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
ConstEval constExpr = new ConstEval(datum);
- for (Target target : plan.getTargets()) {
+
+ for (int i = 0; i < plan.getTargets().length; i++) {
+ Target target = plan.getTargets()[i];
+
if (target.getEvalTree().equals(targetExpr)) {
if (!target.hasAlias()) {
target.setAlias(target.getEvalTree().getName());
}
target.setExpr(constExpr);
+
} else {
EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
}
@@ -140,12 +151,6 @@ public class SeqScanExec extends PhysicalExec {
public void init() throws IOException {
Schema projected;
- if (fragments != null
- && plan.getTableDesc().hasPartition()
- && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
- rewriteColumnPartitionedTableSchema();
- }
-
if (plan.hasTargets()) {
projected = new Schema();
Set<Column> columnSet = new HashSet<Column>();
@@ -193,10 +198,19 @@ public class SeqScanExec extends PhysicalExec {
} else {
initScanner(projected);
}
+
+ super.init();
+ }
+
+ @Override
+ protected void compile() throws CompilationError {
+ if (plan.hasQual()) {
+ qual = context.getPrecompiledEval(inSchema, qual);
+ }
}
private void initScanner(Schema projected) throws IOException {
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
if (fragments != null) {
if (fragments.length > 1) {
this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index ab67d7b..2c63ea6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -46,13 +46,17 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
this.child = child;
}
+ @Override
public void init() throws IOException {
progress = 0.0f;
if (child != null) {
child.init();
}
+
+ super.init();
}
+ @Override
public void rescan() throws IOException {
progress = 0.0f;
if (child != null) {
@@ -60,6 +64,7 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
}
}
+ @Override
public void close() throws IOException {
progress = 1.0f;
if (child != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 56df48d..ef82427 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -161,7 +161,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.serializedData = p.getSerializedData();
return this.serializedData;
}
-
+
public boolean isInterQuery() {
QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
if (interQuery != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index fc1be9b..2c62d42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -60,6 +60,7 @@ import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -115,9 +116,20 @@ public class GlobalEngine extends AbstractService {
super.stop();
}
+ private QueryContext createQueryContext(Session session) {
+ QueryContext newQueryContext = new QueryContext(context.getConf(), session);
+
+ String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
+ if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
+ newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
+ }
+
+ return newQueryContext;
+ }
+
public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) {
LOG.info("Query: " + query);
- QueryContext queryContext = new QueryContext(context.getConf(), session);
+ QueryContext queryContext = createQueryContext(session);
Expr planningContext;
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
new file mode 100644
index 0000000..9a4a01d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.query.QueryContext;
+
+import java.util.Collection;
+
+public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent {
+ private final QueryContext queryContext;
+ private final String planJson;
+
+ public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId,
+ Collection<Container> containers, QueryContext queryContext, String planJson) {
+ super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers);
+ this.queryContext = queryContext;
+ this.planJson = planJson;
+ }
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
+ public String getPlanJson() {
+ return planJson;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index cb7861c..f3e4b72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -42,10 +43,15 @@ import java.util.ArrayList;
import java.util.List;
public class TajoContainerProxy extends ContainerProxy {
+ private final QueryContext queryContext;
+ private final String planJson;
+
public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
Configuration conf, Container container,
- ExecutionBlockId executionBlockId) {
+ QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
super(context, conf, executionBlockId, container);
+ this.queryContext = queryContext;
+ this.planJson = planJson;
}
@Override
@@ -101,6 +107,8 @@ public class TajoContainerProxy extends ContainerProxy {
.setNodeId(container.getNodeId().toString())
.setContainerId(container.getId().toString())
.setQueryOutputPath(context.getStagingDir().toString())
+ .setQueryContext(queryContext.getProto())
+ .setPlanJson(planJson)
.build();
tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 4e51460..8d4a6a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -214,7 +214,7 @@ public class TajoMaster extends CompositeService {
}
private void initWebServer() throws Exception {
- if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
true, null, context.getConf(), null);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index deadd39..3a86802 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -190,7 +190,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
- List<IntermediateEntryProto> intermediateEntries = new ArrayList<IntermediateEntryProto>();
for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
try {
if (worker.getPeerRpcPort() == 0) continue;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 40c5406..87da175 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -42,6 +42,8 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.plan.proto.PlanProto;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
@@ -1034,8 +1036,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
subQuery.eventHandler.handle(
- new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
- subQuery.getId(), allocationEvent.getAllocatedContainer())
+ new LaunchTaskRunnersEvent(subQuery.getId(), allocationEvent.getAllocatedContainer(),
+ subQuery.getContext().getQueryContext(),
+ CoreGsonHelper.toJson(subQuery.getBlock().getPlan(), LogicalNode.class))
);
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
new file mode 100644
index 0000000..a70fbfd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
+import org.apache.tajo.engine.codegen.TajoClassLoader;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.util.Pair;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ExecutionBlockSharedResource {
+ private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
+ private AtomicBoolean initializing = new AtomicBoolean(false);
+ private volatile Boolean resourceInitSuccess = new Boolean(false);
+ private CountDownLatch initializedResourceLatch = new CountDownLatch(1);
+
+ // Query
+ private QueryContext context;
+
+ // Resources
+ private TajoClassLoader classLoader;
+ private ExecutorPreCompiler.CompilationContext compilationContext;
+ private LogicalNode plan;
+ private boolean codeGenEnabled = false;
+
+ public void initialize(final QueryContext context, final String planJson) throws InterruptedException {
+
+ if (!initializing.getAndSet(true)) {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ExecutionBlockSharedResource.this.context = context;
+ initPlan(planJson);
+ initCodeGeneration();
+ resourceInitSuccess = true;
+ } catch (Throwable t) {
+ LOG.error(t);
+ LOG.error(ExceptionUtils.getStackTrace(t));
+ } finally {
+ initializedResourceLatch.countDown();
+ }
+ }
+ });
+ thread.run();
+ thread.join();
+
+ if (!resourceInitSuccess) {
+ throw new RuntimeException("Resource cannot be initialized");
+ }
+ }
+ }
+
+ private void initPlan(String planJson) {
+ plan = CoreGsonHelper.fromJson(planJson, LogicalNode.class);
+ }
+
+ private void initCodeGeneration() throws PlanningException {
+ if (context.getBool(SessionVars.CODEGEN)) {
+ codeGenEnabled = true;
+ classLoader = new TajoClassLoader();
+ compilationContext = new ExecutorPreCompiler.CompilationContext(classLoader);
+ ExecutorPreCompiler.compile(compilationContext, plan);
+ }
+ }
+
+ public boolean awaitInitializedResource() throws InterruptedException {
+ initializedResourceLatch.await();
+ return resourceInitSuccess;
+ }
+
+ public LogicalNode getPlan() {
+ return this.plan;
+ }
+
+ public EvalNode compileEval(Schema schema, EvalNode eval) {
+ return compilationContext.getCompiler().compile(schema, eval);
+ }
+
+ public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) {
+ if (codeGenEnabled) {
+
+ Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval);
+ if (compilationContext.getPrecompiedEvals().containsKey(key)) {
+ return compilationContext.getPrecompiedEvals().get(key);
+ } else {
+ try {
+ LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval);
+ return compileEval(schema, eval);
+ } catch (Throwable t) {
+ LOG.warn(t);
+ return eval;
+ }
+ }
+ } else {
+ throw new IllegalStateException("CodeGen is disabled");
+ }
+ }
+
+ public void release() {
+ compilationContext = null;
+
+ if (classLoader != null) {
+ try {
+ classLoader.clean();
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+ classLoader = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c7e513d..aaff69c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -31,10 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.ContainerProxy;
-import org.apache.tajo.master.TajoContainerProxy;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
-import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.*;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
@@ -143,19 +140,20 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void handle(TaskRunnerGroupEvent event) {
if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
- launchTaskRunners(event.getExecutionBlockId(), event.getContainers());
+ LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event;
+ launchTaskRunners(launchEvent);
} else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
stopContainers(event.getContainers());
}
}
}
- private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ private void launchTaskRunners(LaunchTaskRunnersEvent event) {
// Query in standby mode doesn't need launch Worker.
// But, Assign ExecutionBlock to assigned tajo worker
- for(Container eachContainer: containers) {
+ for(Container eachContainer: event.getContainers()) {
TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
- eachContainer, executionBlockId);
+ eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index d8d09e1..f76176d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathData;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.ha.TajoMasterInfo;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -55,6 +57,7 @@ import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -207,7 +210,7 @@ public class TajoWorker extends CompositeService {
addService(pullService);
}
- if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
try {
httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
if(queryMasterMode && !taskRunnerMode) {
@@ -349,6 +352,9 @@ public class TajoWorker extends CompositeService {
}
public class WorkerContext {
+ private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap =
+ new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>();
+
public QueryMaster getQueryMaster() {
if(queryMasterManagerService == null) {
return null;
@@ -356,6 +362,10 @@ public class TajoWorker extends CompositeService {
return queryMasterManagerService.getQueryMaster();
}
+ public TajoConf getConf() {
+ return systemConf;
+ }
+
public TajoWorkerManagerService getTajoWorkerManagerService() {
return tajoWorkerManagerService;
}
@@ -398,6 +408,25 @@ public class TajoWorker extends CompositeService {
}
}
+ public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson)
+ throws InterruptedException {
+
+ if (!sharedResourceMap.containsKey(blockId)) {
+ ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+ if (sharedResourceMap.putIfAbsent(blockId, resource) == null) {
+ resource.initialize(queryContext, planJson);
+ }
+ }
+ }
+
+ public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) {
+ return sharedResourceMap.get(blockId);
+ }
+
+ public void releaseSharedResource(ExecutionBlockId blockId) {
+ sharedResourceMap.remove(blockId).release();
+ }
+
protected void cleanup(String strPath) {
if(deletionService == null) return;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c5f1446..fa116c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,16 +25,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
import java.net.InetSocketAddress;
@@ -118,7 +116,12 @@ public class TajoWorkerManagerService extends CompositeService
TajoWorkerProtocol.RunExecutionBlockRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
+
try {
+ workerContext.initSharedResource(
+ new QueryContext(workerContext.getConf(), request.getQueryContext()),
+ TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson());
+
String[] params = new String[7];
params[0] = "standby"; //mode(never used)
params[1] = request.getExecutionBlockId();
@@ -132,8 +135,8 @@ public class TajoWorkerManagerService extends CompositeService
params[6] = request.getQueryOutputPath();
workerContext.getTaskRunnerManager().startTask(params);
done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
done.run(TajoWorker.FALSE_PROTO);
}
}
@@ -163,6 +166,9 @@ public class TajoWorkerManagerService extends CompositeService
workerContext.cleanup(inputDir);
String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
workerContext.cleanup(outputDir);
+
+ // Release shared resources
+ workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto));
}
done.run(TajoWorker.TRUE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 1881685..d0665ae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -157,6 +157,20 @@ public class Task {
this.reporter = new Reporter(taskId, masterProxy);
this.reporter.startCommunicationThread();
+
+ // resource intiailization
+ boolean resourceInitialized = false;
+ try {
+ resourceInitialized = context.getSharedResource().awaitInitializedResource();
+ } catch (InterruptedException e) {
+ LOG.error("Failed Resource Initialization", e);
+ } finally {
+ if (!resourceInitialized) {
+ setState(TaskAttemptState.TA_FAILED);
+ return;
+ }
+ }
+
plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
if (scanNode != null) {
@@ -218,25 +232,27 @@ public class Task {
}
public void init() throws IOException {
- // initialize a task temporal dir
- localFS.mkdirs(taskDir);
-
- if (request.getFetches().size() > 0) {
- inputTableBaseDir = localFS.makeQualified(
- lDirAllocator.getLocalPathForWrite(
- getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
- localFS.mkdirs(inputTableBaseDir);
- Path tableDir;
- for (String inputTable : context.getInputTables()) {
- tableDir = new Path(inputTableBaseDir, inputTable);
- if (!localFS.exists(tableDir)) {
- LOG.info("the directory is created " + tableDir.toUri());
- localFS.mkdirs(tableDir);
+ if (context.getState() == TaskAttemptState.TA_PENDING) {
+ // initialize a task temporal dir
+ localFS.mkdirs(taskDir);
+
+ if (request.getFetches().size() > 0) {
+ inputTableBaseDir = localFS.makeQualified(
+ lDirAllocator.getLocalPathForWrite(
+ getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+ localFS.mkdirs(inputTableBaseDir);
+ Path tableDir;
+ for (String inputTable : context.getInputTables()) {
+ tableDir = new Path(inputTableBaseDir, inputTable);
+ if (!localFS.exists(tableDir)) {
+ LOG.info("the directory is created " + tableDir.toUri());
+ localFS.mkdirs(tableDir);
+ }
}
}
+ // for localizing the intermediate data
+ localize(request);
}
- // for localizing the intermediate data
- localize(request);
}
public QueryUnitAttemptId getTaskId() {
@@ -426,14 +442,21 @@ public class Task {
while(!killed && executor.next() != null) {
}
- this.executor.close();
- reloadInputStats();
- this.executor = null;
} catch (Exception e) {
error = e ;
LOG.error(e.getMessage(), e);
aborted = true;
} finally {
+ if (executor != null) {
+ try {
+ executor.close();
+ reloadInputStats();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ this.executor = null;
+ }
+
context.setProgress(1.0f);
taskRunnerContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 6cb4bd7..d27fd6d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -26,8 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.query.QueryContext;
@@ -75,16 +77,23 @@ public class TaskAttemptContext {
private Enforcer enforcer;
private QueryContext queryContext;
private WorkerContext workerContext;
+ private ExecutionBlockSharedResource sharedResource;
/** a output volume for each partition */
private Map<Integer, Long> partitionOutputVolume;
private HashShuffleAppenderManager hashShuffleAppenderManager;
- public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, final QueryUnitAttemptId queryId,
+ public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext,
+ final QueryUnitAttemptId queryId,
final FragmentProto[] fragments,
final Path workDir) {
this.queryContext = queryContext;
- this.workerContext = workerContext;
+
+ if (workerContext != null) { // For unit tests
+ this.workerContext = workerContext;
+ this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId());
+ }
+
this.queryId = queryId;
if (fragments != null) {
@@ -154,6 +163,23 @@ public class TaskAttemptContext {
return this.enforcer;
}
+ public ExecutionBlockSharedResource getSharedResource() {
+ return sharedResource;
+ }
+
+ public EvalNode compileEval(Schema schema, EvalNode eval) {
+ return sharedResource.compileEval(schema, eval);
+ }
+
+ public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) {
+ if (sharedResource != null) {
+ return sharedResource.getPreCompiledEval(schema, eval);
+ } else {
+ LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests");
+ return eval;
+ }
+ }
+
public boolean hasResultStats() {
return resultStats != null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index cdb1438..e100c48 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -207,6 +207,9 @@ message RunExecutionBlockRequestProto {
required string nodeId = 4;
required string containerId = 5;
optional string queryOutputPath = 6;
+
+ required KeyValueSetProto queryContext = 7;
+ required string planJson = 8;
}
message ExecutionBlockListProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 271ba70..4d9ca67 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -33,6 +33,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.TajoIdUtils;
@@ -66,12 +67,15 @@ public class LocalTajoTestingUtility {
public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
}
+
public static Session createDummySession() {
return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME);
}
public static QueryContext createDummyContext(TajoConf conf) {
- return new QueryContext(conf, createDummySession());
+ QueryContext context = new QueryContext(conf, createDummySession());
+ context.putAll(CommonTestingUtil.getSessionVarsForTest().getAllKeyValus());
+ return context;
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 7b87112..346fa69 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -92,9 +92,16 @@ public class TajoTestingCluster {
public TajoTestingCluster(boolean masterHaEMode) {
this.conf = new TajoConf();
this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
+
+ setTestingFlagProperties();
initPropertiesAndConfigs();
}
+ void setTestingFlagProperties() {
+ System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+ }
+
void initPropertiesAndConfigs() {
if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
@@ -106,7 +113,6 @@ public class TajoTestingCluster {
this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
.indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
- conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
}
public TajoConf getConfiguration() {