You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:58 UTC
[36/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
new file mode 100644
index 0000000..1ee0878
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.ObjectArrays;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Stack;
+
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVerifier.Context, Expr> {
+ private CatalogService catalog;
+
+ public PreLogicalPlanVerifier(CatalogService catalog) {
+ this.catalog = catalog;
+ }
+
+ public static class Context {
+ Session session;
+ VerificationState state;
+
+ public Context(Session session, VerificationState state) {
+ this.session = session;
+ this.state = state;
+ }
+ }
+
+ public VerificationState verify(Session session, VerificationState state, Expr expr) throws PlanningException {
+ Context context = new Context(session, state);
+ visit(context, new Stack<Expr>(), expr);
+ return context.state;
+ }
+
+ public Expr visitProjection(Context context, Stack<Expr> stack, Projection expr) throws PlanningException {
+ super.visitProjection(context, stack, expr);
+
+ Set<String> names = TUtil.newHashSet();
+ Expr [] distinctValues = null;
+
+ for (NamedExpr namedExpr : expr.getNamedExprs()) {
+
+ if (namedExpr.hasAlias()) {
+ if (names.contains(namedExpr.getAlias())) {
+ context.state.addVerification(String.format("column name \"%s\" specified more than once",
+ namedExpr.getAlias()));
+ } else {
+ names.add(namedExpr.getAlias());
+ }
+ }
+
+ // no two aggregations can have different DISTINCT columns.
+ //
+ // For example, the following query will work
+ // SELECT count(DISTINCT col1) and sum(DISTINCT col1) ..
+ //
+ // But, the following query will not work in this time
+ //
+ // SELECT count(DISTINCT col1) and SUM(DISTINCT col2) ..
+ Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
+ if (exprs.size() > 0) {
+ for (GeneralSetFunctionExpr setFunction : exprs) {
+ if (distinctValues == null && setFunction.isDistinct()) {
+ distinctValues = setFunction.getParams();
+ } else if (distinctValues != null && setFunction.isDistinct()) {
+ if (!Arrays.equals(distinctValues, setFunction.getParams())) {
+ Expr [] differences = ObjectArrays.concat(distinctValues, setFunction.getParams(), Expr.class);
+ throw new PlanningException("different DISTINCT columns are not supported yet: "
+ + TUtil.arrayToString(differences));
+ }
+ }
+ }
+ }
+
+ // Currently, avg functions with distinct aggregation are not supported.
+ // This code does not allow users to use avg functions with distinct aggregation.
+ if (distinctValues != null) {
+ for (GeneralSetFunctionExpr setFunction : exprs) {
+ if (setFunction.getSignature().equalsIgnoreCase("avg")) {
+ if (setFunction.isDistinct()) {
+ throw new PlanningException("avg(distinct) function is not supported yet.");
+ } else {
+ throw new PlanningException("avg() function with distinct aggregation functions is not supported yet.");
+ }
+ }
+ }
+ }
+ }
+ return expr;
+ }
+
+ @Override
+ public Expr visitGroupBy(Context context, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+ super.visitGroupBy(context, stack, expr);
+
+ // Enforcer only ordinary grouping set.
+ for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
+ if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
+ context.state.addVerification(groupingElement.getType() + " is not supported yet");
+ }
+ }
+
+ Projection projection = null;
+ for (Expr parent : stack) {
+ if (parent.getType() == OpType.Projection) {
+ projection = (Projection) parent;
+ break;
+ }
+ }
+
+ if (projection == null) {
+ throw new PlanningException("No Projection");
+ }
+
+ return expr;
+ }
+
+ @Override
+ public Expr visitRelation(Context context, Stack<Expr> stack, Relation expr) throws PlanningException {
+ assertRelationExistence(context, expr.getName());
+ return expr;
+ }
+
+ private boolean assertRelationExistence(Context context, String tableName) {
+ String qualifiedName;
+
+ if (CatalogUtil.isFQTableName(tableName)) {
+ qualifiedName = tableName;
+ } else {
+ qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+ }
+
+ if (!catalog.existsTable(qualifiedName)) {
+ context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
+ return false;
+ }
+ return true;
+ }
+
+ private boolean assertRelationNoExistence(Context context, String tableName) {
+ String qualifiedName;
+
+ if (CatalogUtil.isFQTableName(tableName)) {
+ qualifiedName = tableName;
+ } else {
+ qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+ }
+ if (catalog.existsTable(qualifiedName)) {
+ context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName));
+ return false;
+ }
+ return true;
+ }
+
+ private boolean assertUnsupportedStoreType(VerificationState state, String name) {
+ if (name != null && name.equals(CatalogProtos.StoreType.RAW.name())) {
+ state.addVerification(String.format("Unsupported store type :%s", name));
+ return false;
+ }
+ return true;
+ }
+
+ private boolean assertDatabaseExistence(VerificationState state, String name) {
+ if (!catalog.existDatabase(name)) {
+ state.addVerification(String.format("database \"%s\" does not exist", name));
+ return false;
+ }
+ return true;
+ }
+
+ private boolean assertDatabaseNoExistence(VerificationState state, String name) {
+ if (catalog.existDatabase(name)) {
+ state.addVerification(String.format("database \"%s\" already exists", name));
+ return false;
+ }
+ return true;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Data Definition Language Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+ @Override
+ public Expr visitCreateDatabase(Context context, Stack<Expr> stack, CreateDatabase expr)
+ throws PlanningException {
+ super.visitCreateDatabase(context, stack, expr);
+ if (!expr.isIfNotExists()) {
+ assertDatabaseNoExistence(context.state, expr.getDatabaseName());
+ }
+ return expr;
+ }
+
+ @Override
+ public Expr visitDropDatabase(Context context, Stack<Expr> stack, DropDatabase expr) throws PlanningException {
+ super.visitDropDatabase(context, stack, expr);
+ if (!expr.isIfExists()) {
+ assertDatabaseExistence(context.state, expr.getDatabaseName());
+ }
+ return expr;
+ }
+
+ @Override
+ public Expr visitCreateTable(Context context, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+ super.visitCreateTable(context, stack, expr);
+ if (!expr.isIfNotExists()) {
+ assertRelationNoExistence(context, expr.getTableName());
+ }
+ assertUnsupportedStoreType(context.state, expr.getStorageType());
+ return expr;
+ }
+
+ @Override
+ public Expr visitDropTable(Context context, Stack<Expr> stack, DropTable expr) throws PlanningException {
+ super.visitDropTable(context, stack, expr);
+ if (!expr.isIfExists()) {
+ assertRelationExistence(context, expr.getTableName());
+ }
+ return expr;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Insert or Update Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public Expr visitInsert(Context context, Stack<Expr> stack, Insert expr) throws PlanningException {
+ Expr child = super.visitInsert(context, stack, expr);
+
+ if (!expr.isOverwrite()) {
+ context.state.addVerification("INSERT INTO statement is not supported yet.");
+ }
+
+ if (expr.hasTableName()) {
+ assertRelationExistence(context, expr.getTableName());
+ }
+
+ if (child != null && child.getType() == OpType.Projection) {
+ if (expr.hasTargetColumns()) {
+ Projection projection = (Projection) child;
+ int projectColumnNum = projection.getNamedExprs().length;
+ int targetColumnNum = expr.getTargetColumns().length;
+
+ if (targetColumnNum > projectColumnNum) {
+ context.state.addVerification("INSERT has more target columns than expressions");
+ } else if (targetColumnNum < projectColumnNum) {
+ context.state.addVerification("INSERT has more expressions than target columns");
+ }
+ }
+ }
+
+ return expr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
new file mode 100644
index 0000000..161d39b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.storage.Tuple;
+
+public class Projector {
+ private final Schema inSchema;
+
+ // for projection
+ private final int targetNum;
+ private final EvalNode[] evals;
+
+ public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+ this.inSchema = inSchema;
+ if (targets == null) {
+ targets = PlannerUtil.schemaToTargets(outSchema);
+ }
+ this.targetNum = targets.length;
+ evals = new EvalNode[targetNum];
+ for (int i = 0; i < targetNum; i++) {
+ evals[i] = targets[i].getEvalTree();
+ }
+ }
+
+ public void eval(Tuple in, Tuple out) {
+ if (targetNum > 0) {
+ for (int i = 0; i < evals.length; i++) {
+ out.put(i, evals[i].eval(inSchema, in));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
new file mode 100644
index 0000000..a3522c7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+import java.math.BigDecimal;
+
+public abstract class RangePartitionAlgorithm {
+ protected SortSpec [] sortSpecs;
+ protected TupleRange range;
+ protected final BigDecimal totalCard;
+ /** true if the end of the range is inclusive. Otherwise, it should be false. */
+ protected final boolean inclusive;
+
+ /**
+ *
+ * @param sortSpecs The array of sort keys
+ * @param totalRange The total range to be partition
+ * @param inclusive true if the end of the range is inclusive. Otherwise, false.
+ */
+ public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
+ this.sortSpecs = sortSpecs;
+ this.range = totalRange;
+ this.inclusive = inclusive;
+ this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
+ }
+
+ /**
+ * It computes the value cardinality of a tuple range.
+ *
+ * @param dataType
+ * @param start
+ * @param end
+ * @return
+ */
+ public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
+ boolean inclusive, boolean isAscending) {
+ BigDecimal columnCard;
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ columnCard = new BigDecimal(2);
+ break;
+ case CHAR:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asChar() - start.asChar());
+ } else {
+ columnCard = new BigDecimal(start.asChar() - end.asChar());
+ }
+ break;
+ case BIT:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asByte() - start.asByte());
+ } else {
+ columnCard = new BigDecimal(start.asByte() - end.asByte());
+ }
+ break;
+ case INT2:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+ } else {
+ columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+ }
+ break;
+ case INT4:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
+ break;
+ case INT8:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
+ break;
+ case FLOAT4:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
+ break;
+ case FLOAT8:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
+ break;
+ case TEXT:
+ final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0);
+ final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0);
+ if (isAscending) {
+ columnCard = new BigDecimal(textEnd - textStart);
+ } else {
+ columnCard = new BigDecimal(textStart - textEnd);
+ }
+ break;
+ case DATE:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
+ break;
+ case TIME:
+ case TIMESTAMP:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ } else {
+ columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+ }
+ break;
+ case INET4:
+ if (isAscending) {
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ } else {
+ columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(dataType + " is not supported yet");
+ }
+
+ return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs();
+ }
+
+ /**
+ * It computes the value cardinality of a tuple range.
+ * @return
+ */
+ public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
+ Tuple start = range.getStart();
+ Tuple end = range.getEnd();
+ Column col;
+
+ BigDecimal cardinality = new BigDecimal(1);
+ BigDecimal columnCard;
+ for (int i = 0; i < sortSpecs.length; i++) {
+ columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+ sortSpecs[i].isAscending());
+
+ if (new BigDecimal(0).compareTo(columnCard) < 0) {
+ cardinality = cardinality.multiply(columnCard);
+ }
+ }
+
+ return cardinality;
+ }
+
+ public BigDecimal getTotalCardinality() {
+ return totalCard;
+ }
+
+ /**
+ *
+ * @param partNum the number of desired partitions, but it may return the less partitions.
+ * @return the end of intermediate ranges are exclusive, and the end of final range is inclusive.
+ */
+ public abstract TupleRange[] partition(int partNum);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
new file mode 100644
index 0000000..bae6e4a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+
+import java.util.Stack;
+
+/**
+ * <code>SimpleAlgebraVisitor</code> provides a simple and fewer visit methods. It makes building concrete class easier.
+ */
+public abstract class SimpleAlgebraVisitor<CONTEXT, RESULT> extends BaseAlgebraVisitor<CONTEXT, RESULT> {
+
+ public RESULT visit(CONTEXT ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
+ RESULT result = null;
+ if (expr instanceof UnaryOperator) {
+ preHook(ctx, stack, expr);
+ result = visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
+ postHook(ctx, stack, expr, result);
+ } else if (expr instanceof BinaryOperator) {
+ preHook(ctx, stack, expr);
+ result = visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
+ postHook(ctx, stack, expr, result);
+ } else {
+ result = super.visit(ctx, stack, expr);
+ }
+
+ return result;
+ }
+
+ public RESULT visitUnaryOperator(CONTEXT ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
+ stack.push(expr);
+ RESULT result = visit(ctx, stack, expr.getChild());
+ stack.pop();
+ return result;
+ }
+
+ public RESULT visitBinaryOperator(CONTEXT ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+ stack.push(expr);
+ visit(ctx, stack, expr.getLeft());
+ RESULT result = visit(ctx, stack, expr.getRight());
+ stack.pop();
+ return result;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Relational Operator Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
+ return super.visitProjection(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitLimit(CONTEXT ctx, Stack<Expr> stack, Limit expr) throws PlanningException {
+ return super.visitLimit(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitSort(CONTEXT ctx, Stack<Expr> stack, Sort expr) throws PlanningException {
+ return super.visitSort(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitHaving(CONTEXT ctx, Stack<Expr> stack, Having expr) throws PlanningException {
+ return super.visitHaving(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitGroupBy(CONTEXT ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+ return super.visitGroupBy(ctx, stack, expr);
+ }
+
+ public RESULT visitFilter(CONTEXT ctx, Stack<Expr> stack, Selection expr) throws PlanningException {
+ return super.visitFilter(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitJoin(CONTEXT ctx, Stack<Expr> stack, Join expr) throws PlanningException {
+ return super.visitJoin(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitTableSubQuery(CONTEXT ctx, Stack<Expr> stack, TablePrimarySubQuery expr) throws PlanningException {
+ return super.visitTableSubQuery(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitRelationList(CONTEXT ctx, Stack<Expr> stack, RelationList expr) throws PlanningException {
+ return super.visitRelationList(ctx, stack, expr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Data Definition Language Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitCreateTable(CONTEXT ctx, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+ return super.visitCreateTable(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitDropTable(CONTEXT ctx, Stack<Expr> stack, DropTable expr) throws PlanningException {
+ return super.visitDropTable(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitAlterTable(CONTEXT ctx, Stack<Expr> stack, AlterTable expr) throws PlanningException {
+ return super.visitAlterTable(ctx, stack, expr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Insert or Update Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ @Override
+ public RESULT visitInsert(CONTEXT ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+ return super.visitInsert(ctx, stack, expr);
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Other Predicates Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitBetween(CONTEXT ctx, Stack<Expr> stack, BetweenPredicate expr) throws PlanningException {
+ return super.visitBetween(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitCaseWhen(CONTEXT ctx, Stack<Expr> stack, CaseWhenPredicate expr) throws PlanningException {
+ return super.visitCaseWhen(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitValueListExpr(CONTEXT ctx, Stack<Expr> stack, ValueListExpr expr) throws PlanningException {
+ return super.visitValueListExpr(ctx, stack, expr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Other Expressions
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitFunction(CONTEXT ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+ return super.visitFunction(ctx, stack, expr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // General Set Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitCountRowsFunction(CONTEXT ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
+ throws PlanningException {
+ return super.visitCountRowsFunction(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitGeneralSetFunction(CONTEXT ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+ throws PlanningException {
+ return super.visitGeneralSetFunction(ctx, stack, expr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Literal Section
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public RESULT visitDataType(CONTEXT ctx, Stack<Expr> stack, DataTypeExpr expr) throws PlanningException {
+ return super.visitDataType(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitCastExpr(CONTEXT ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+ return super.visitCastExpr(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitLiteral(CONTEXT ctx, Stack<Expr> stack, LiteralValue expr) throws PlanningException {
+ return super.visitLiteral(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitNullLiteral(CONTEXT ctx, Stack<Expr> stack, NullLiteral expr) throws PlanningException {
+ return super.visitNullLiteral(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitTimestampLiteral(CONTEXT ctx, Stack<Expr> stack, TimestampLiteral expr) throws PlanningException {
+ return super.visitTimestampLiteral(ctx, stack, expr);
+ }
+
+ @Override
+ public RESULT visitTimeLiteral(CONTEXT ctx, Stack<Expr> stack, TimeLiteral expr) throws PlanningException {
+ return super.visitTimeLiteral(ctx, stack, expr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
new file mode 100644
index 0000000..6a16d3c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+/**
+ * A Target contains how to evaluate an expression and its alias name.
+ */
+public class Target implements Cloneable, GsonObject {
+ @Expose private EvalNode expr;
+ @Expose private Column column;
+ @Expose private String alias = null;
+
+ public Target(FieldEval fieldEval) {
+ this.expr = fieldEval;
+ this.column = fieldEval.getColumnRef();
+ }
+
+ public Target(final EvalNode eval, final String alias) {
+ this.expr = eval;
+ // force lower case
+ String normalized = alias;
+
+ // If an expr is a column reference and its alias is equivalent to column name, ignore a given alias.
+ if (eval instanceof FieldEval && eval.getName().equals(normalized)) {
+ column = ((FieldEval) eval).getColumnRef();
+ } else {
+ column = new Column(normalized, eval.getValueType());
+ setAlias(alias);
+ }
+ }
+
+ public String getCanonicalName() {
+ return !hasAlias() ? column.getQualifiedName() : alias;
+ }
+
+ public final void setExpr(EvalNode expr) {
+ this.expr = expr;
+ }
+
+ public final void setAlias(String alias) {
+ this.alias = alias;
+ this.column = new Column(alias, expr.getValueType());
+ }
+
+ public final String getAlias() {
+ return alias;
+ }
+
+ public final boolean hasAlias() {
+ return alias != null;
+ }
+
+ public DataType getDataType() {
+ return column.getDataType();
+ }
+
+ public <T extends EvalNode> T getEvalTree() {
+ return (T) this.expr;
+ }
+
+ public Column getNamedColumn() {
+ return this.column;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder(expr.toString());
+ if(hasAlias()) {
+ sb.append(" as ").append(alias);
+ }
+ return sb.toString();
+ }
+
+ public boolean equals(Object obj) {
+ if(obj instanceof Target) {
+ Target other = (Target) obj;
+
+ boolean b1 = expr.equals(other.expr);
+ boolean b2 = column.equals(other.column);
+ boolean b3 = TUtil.checkEquals(alias, other.alias);
+
+ return b1 && b2 && b3;
+ } else {
+ return false;
+ }
+ }
+
+ public int hashCode() {
+ return this.expr.getName().hashCode();
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ Target target = (Target) super.clone();
+ target.expr = (EvalNode) expr.clone();
+ target.column = column;
+ target.alias = alias != null ? alias : null;
+
+ return target;
+ }
+
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, Target.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
new file mode 100644
index 0000000..f6922ed
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.exception.RangeOverflowException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.Bytes;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+
+
+public class UniformRangePartition extends RangePartitionAlgorithm {
+ private int variableId;
+ private BigDecimal[] cardForEachDigit;
+ private BigDecimal[] colCards;
+
+ /**
+ *
+ * @param totalRange
+ * @param sortSpecs The description of sort keys
+ * @param inclusive true if the end of the range is inclusive
+ */
+ public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+ super(sortSpecs, totalRange, inclusive);
+ colCards = new BigDecimal[sortSpecs.length];
+ for (int i = 0; i < sortSpecs.length; i++) {
+ colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
+ totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
+ }
+
+ cardForEachDigit = new BigDecimal[colCards.length];
+ for (int i = 0; i < colCards.length ; i++) {
+ if (i == 0) {
+ cardForEachDigit[i] = colCards[i];
+ } else {
+ cardForEachDigit[i] = cardForEachDigit[i - 1].multiply(colCards[i]);
+ }
+ }
+ }
+
+ public UniformRangePartition(TupleRange range, SortSpec [] sortSpecs) {
+ this(range, sortSpecs, true);
+ }
+
+ @Override
+ public TupleRange[] partition(int partNum) {
+ Preconditions.checkArgument(partNum > 0,
+ "The number of partitions must be positive, but the given number: "
+ + partNum);
+ Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
+ "the number of partition cannot exceed total cardinality (" + totalCard + ")");
+
+ int varId;
+ for (varId = 0; varId < cardForEachDigit.length; varId++) {
+ if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0)
+ break;
+ }
+ this.variableId = varId;
+
+ BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1];
+ for (int i = variableId; i >= 0; i--) {
+ if (i == variableId) {
+ reverseCardsForDigit[i] = colCards[i];
+ } else {
+ reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+ }
+ }
+
+ List<TupleRange> ranges = Lists.newArrayList();
+ BigDecimal term = reverseCardsForDigit[0].divide(
+ new BigDecimal(partNum), RoundingMode.CEILING);
+ BigDecimal reminder = reverseCardsForDigit[0];
+ Tuple last = range.getStart();
+ while(reminder.compareTo(new BigDecimal(0)) > 0) {
+ if (reminder.compareTo(term) <= 0) { // final one is inclusive
+ ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
+ } else {
+ Tuple next = increment(last, term.longValue(), variableId);
+ ranges.add(new TupleRange(sortSpecs, last, next));
+ }
+ last = ranges.get(ranges.size() - 1).getEnd();
+ reminder = reminder.subtract(term);
+ }
+
+ return ranges.toArray(new TupleRange[ranges.size()]);
+ }
+
+ /**
+ * Check whether an overflow occurs or not.
+ *
+ * @param colId The column id to be checked
+ * @param last
+ * @param inc
+ * @param sortSpecs
+ * @return
+ */
+ public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+ Column column = sortSpecs[colId].getSortKey();
+ BigDecimal candidate;
+ boolean overflow = false;
+
+ switch (column.getDataType().getType()) {
+ case BIT: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asByte()));
+ return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asByte()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+ }
+ }
+ case CHAR: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal((int)last.asChar()));
+ return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal((int)last.asChar()).subtract(inc);
+ return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+ }
+ }
+ case INT2: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt2()));
+ return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt2()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+ }
+ }
+ case DATE:
+ case INT4: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt4()));
+ return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt4()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+ }
+ }
+ case TIME:
+ case TIMESTAMP:
+ case INT8: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asInt8()));
+ return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asInt8()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+ }
+ }
+ case FLOAT4: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asFloat4()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asFloat4()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+ }
+ }
+ case FLOAT8: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal(last.asFloat8()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal(last.asFloat8()).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+ }
+
+ }
+ case TEXT: {
+ if (sortSpecs[colId].isAscending()) {
+ candidate = inc.add(new BigDecimal((int)(last instanceof NullDatum ? '0' : last.asChars().charAt(0))));
+ return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+ } else {
+ candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
+ return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+ }
+ }
+ case INET4: {
+ int candidateIntVal;
+ byte[] candidateBytesVal = new byte[4];
+ if (sortSpecs[colId].isAscending()) {
+ candidateIntVal = inc.intValue() + last.asInt4();
+ if (candidateIntVal - inc.intValue() != last.asInt4()) {
+ return true;
+ }
+ Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
+ return Bytes.compareTo(range.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0;
+ } else {
+ candidateIntVal = last.asInt4() - inc.intValue();
+ if (candidateIntVal + inc.intValue() != last.asInt4()) {
+ return true;
+ }
+ Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
+ return Bytes.compareTo(candidateBytesVal, range.getEnd().get(colId).asByteArray()) < 0;
+ }
+ }
+ }
+ return overflow;
+ }
+
+ public long incrementAndGetReminder(int colId, Datum last, long inc) {
+ Column column = sortSpecs[colId].getSortKey();
+ long reminder = 0;
+ switch (column.getDataType().getType()) {
+ case BIT: {
+ long candidate = last.asByte() + inc;
+ byte end = range.getEnd().get(colId).asByte();
+ reminder = candidate - end;
+ break;
+ }
+ case CHAR: {
+ long candidate = last.asChar() + inc;
+ char end = range.getEnd().get(colId).asChar();
+ reminder = candidate - end;
+ break;
+ }
+ case DATE:
+ case INT4: {
+ int candidate = (int) (last.asInt4() + inc);
+ int end = range.getEnd().get(colId).asInt4();
+ reminder = candidate - end;
+ break;
+ }
+ case TIME:
+ case TIMESTAMP:
+ case INT8:
+ case INET4: {
+ long candidate = last.asInt8() + inc;
+ long end = range.getEnd().get(colId).asInt8();
+ reminder = candidate - end;
+ break;
+ }
+ case FLOAT4: {
+ float candidate = last.asFloat4() + inc;
+ float end = range.getEnd().get(colId).asFloat4();
+ reminder = (long) (candidate - end);
+ break;
+ }
+ case FLOAT8: {
+ double candidate = last.asFloat8() + inc;
+ double end = range.getEnd().get(colId).asFloat8();
+ reminder = (long) Math.ceil(candidate - end);
+ break;
+ }
+ case TEXT: {
+ char candidate = ((char)(last.asChars().charAt(0) + inc));
+ char end = range.getEnd().get(colId).asChars().charAt(0);
+ reminder = (char) (candidate - end);
+ break;
+ }
+ }
+
+ // including zero
+ return reminder - 1;
+ }
+
+ /**
+ *
+ * @param last
+ * @param inc
+ * @return
+ */
+ public Tuple increment(final Tuple last, final long inc, final int baseDigit) {
+ BigDecimal [] incs = new BigDecimal[last.size()];
+ boolean [] overflowFlag = new boolean[last.size()];
+ BigDecimal [] result;
+ BigDecimal value = new BigDecimal(inc);
+
+ BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1];
+ for (int i = baseDigit; i >= 0; i--) {
+ if (i == baseDigit) {
+ reverseCardsForDigit[i] = colCards[i];
+ } else {
+ reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+ }
+ }
+
+ for (int i = 0; i < baseDigit; i++) {
+ result = value.divideAndRemainder(reverseCardsForDigit[i + 1]);
+ incs[i] = result[0];
+ value = result[1];
+ }
+ int finalId = baseDigit;
+ incs[finalId] = value;
+ for (int i = finalId; i >= 0; i--) {
+ if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
+ if (i == 0) {
+ throw new RangeOverflowException(range, last, incs[i].longValue());
+ }
+ long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
+ incs[i] = new BigDecimal(rem);
+ incs[i - 1] = incs[i-1].add(new BigDecimal(1));
+ overflowFlag[i] = true;
+ } else {
+ if (i > 0) {
+ incs[i] = value;
+ break;
+ }
+ }
+ }
+
+ for (int i = 0; i < incs.length; i++) {
+ if (incs[i] == null) {
+ incs[i] = new BigDecimal(0);
+ }
+ }
+
+ Tuple end = new VTuple(sortSpecs.length);
+ Column column;
+ for (int i = 0; i < last.size(); i++) {
+ column = sortSpecs[i].getSortKey();
+ switch (column.getDataType().getType()) {
+ case CHAR:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+ }
+ break;
+ case BIT:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createBit(
+ (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+ }
+ break;
+ case INT2:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt2(
+ (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+ }
+ break;
+ case INT4:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt4(
+ (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+ } else {
+ if (sortSpecs[i].isAscending()) {
+ end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+ }
+ }
+ break;
+ case INT8:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt8(
+ range.getStart().get(i).asInt8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
+ case FLOAT4:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createFloat4(
+ range.getStart().get(i).asFloat4() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+ }
+ break;
+ case FLOAT8:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createFloat8(
+ range.getStart().get(i).asFloat8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+ }
+ break;
+ case TEXT:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
+ + incs[i].longValue())) + ""));
+ } else {
+ end.put(i, DatumFactory.createText(
+ ((char) ((last.get(i) instanceof NullDatum ? '0': last.get(i).asChars().charAt(0)) + incs[i].longValue())) + ""));
+ }
+ break;
+ case DATE:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+ }
+ break;
+ case TIME:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
+ case TIMESTAMP:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createTimeStampFromMillis(
+ range.getStart().get(i).asInt8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createTimeStampFromMillis(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
+ case INET4:
+ byte[] ipBytes;
+ if (overflowFlag[i]) {
+ ipBytes = range.getStart().get(i).asByteArray();
+ assert ipBytes.length == 4;
+ end.put(i, DatumFactory.createInet4(ipBytes));
+ } else {
+ int lastVal = last.get(i).asInt4() + incs[i].intValue();
+ ipBytes = new byte[4];
+ Bytes.putInt(ipBytes, 0, lastVal);
+ end.put(i, DatumFactory.createInet4(ipBytes));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
+ }
+ }
+
+ return end;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
new file mode 100644
index 0000000..18882b8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+
+public class VerificationState {
+ private static final Log LOG = LogFactory.getLog(VerificationState.class);
+ List<String> errorMessages = Lists.newArrayList();
+
+ public void addVerification(String error) {
+ LOG.warn(TUtil.getCurrentCodePoint(1) + " causes: " + error);
+ errorMessages.add(error);
+ }
+
+ public boolean verified() {
+ return errorMessages.size() == 0;
+ }
+
+ public List<String> getErrorMessages() {
+ return errorMessages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
new file mode 100644
index 0000000..91190f6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.enforce;
+
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+
+public class Enforcer implements ProtoObject<EnforcerProto> {
+ Map<EnforceType, List<EnforceProperty>> properties;
+ private EnforcerProto proto;
+
+ @SuppressWarnings("unused")
+ public Enforcer() {
+ properties = TUtil.newHashMap();
+ }
+
+ public Enforcer(EnforcerProto proto) {
+ this.proto = proto;
+ }
+
+ private EnforceProperty.Builder newProperty() {
+ return EnforceProperty.newBuilder();
+ }
+
+ private void initProperties() {
+ if (properties == null) {
+ properties = TUtil.newHashMap();
+ for (EnforceProperty property : proto.getPropertiesList()) {
+ TUtil.putToNestedList(properties, property.getType(), property);
+ }
+ }
+ }
+
+ public boolean hasEnforceProperty(EnforceType type) {
+ initProperties();
+ return properties.containsKey(type);
+ }
+
+ public List<EnforceProperty> getEnforceProperties(EnforceType type) {
+ initProperties();
+ return properties.get(type);
+ }
+
+ public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
+ EnforceProperty.Builder builder = newProperty();
+ SortedInputEnforce.Builder enforce = SortedInputEnforce.newBuilder();
+ enforce.setTableName(tableName);
+ for (SortSpec sortSpec : sortSpecs) {
+ enforce.addSortSpecs(sortSpec.getProto());
+ }
+
+ builder.setType(EnforceType.SORTED_INPUT);
+ builder.setSortedInput(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addOutputDistinct() {
+ EnforceProperty.Builder builder = newProperty();
+ OutputDistinctEnforce.Builder enforce = OutputDistinctEnforce.newBuilder();
+
+ builder.setType(EnforceType.OUTPUT_DISTINCT);
+ builder.setOutputDistinct(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.JOIN);
+ builder.setJoin(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) {
+ EnforceProperty.Builder builder = newProperty();
+ GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION);
+ if (sortSpecs != null) {
+ for (SortSpec sortSpec : sortSpecs) {
+ enforce.addSortSpecs(sortSpec.getProto());
+ }
+ }
+
+ builder.setType(EnforceType.GROUP_BY);
+ builder.setGroupby(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceHashAggregation(int pid) {
+ EnforceProperty.Builder builder = newProperty();
+ GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION);
+
+ builder.setType(EnforceType.GROUP_BY);
+ builder.setGroupby(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ SortEnforce.Builder enforce = SortEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.SORT);
+ builder.setSort(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addBroadcast(String tableName) {
+ EnforceProperty.Builder builder = newProperty();
+ BroadcastEnforce.Builder enforce = BroadcastEnforce.newBuilder();
+ enforce.setTableName(tableName);
+
+ builder.setType(EnforceType.BROADCAST);
+ builder.setBroadcast(enforce);
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceColumnPartitionAlgorithm(int pid, ColumnPartitionAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ ColumnPartitionEnforcer.Builder enforce = ColumnPartitionEnforcer.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.COLUMN_PARTITION);
+ builder.setColumnPartition(enforce);
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public Collection<EnforceProperty> getProperties() {
+ if (proto != null) {
+ return proto.getPropertiesList();
+ } else {
+ List<EnforceProperty> list = TUtil.newList();
+ for (List<EnforceProperty> propertyList : properties.values()) {
+ list.addAll(propertyList);
+ }
+ return list;
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Enforce ").append(properties.size()).append(" properties: ");
+ boolean first = true;
+ for (EnforceType enforceType : properties.keySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(enforceType);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public EnforcerProto getProto() {
+ EnforcerProto.Builder builder = EnforcerProto.newBuilder();
+ builder.addAllProperties(getProperties());
+ return builder.build();
+ }
+
+ public static String toString(EnforceProperty property) {
+ StringBuilder sb = new StringBuilder();
+ switch (property.getType()) {
+ case GROUP_BY:
+ GroupbyEnforce groupby = property.getGroupby();
+ sb.append("type=GroupBy,alg=");
+ if (groupby.getAlgorithm() == GroupbyAlgorithm.HASH_AGGREGATION) {
+ sb.append("hash");
+ } else {
+ sb.append("sort");
+ sb.append(",keys=");
+ boolean first = true;
+ for (CatalogProtos.SortSpecProto sortSpec : groupby.getSortSpecsList()) {
+ if (first == true) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(sortSpec.getColumn().getName());
+ sb.append(" (").append(sortSpec.getAscending() ? "asc":"desc").append(")");
+ }
+ }
+ break;
+ case BROADCAST:
+ BroadcastEnforce broadcast = property.getBroadcast();
+ sb.append("type=Broadcast, tables=").append(broadcast.getTableName());
+ break;
+ case COLUMN_PARTITION:
+ ColumnPartitionEnforcer columnPartition = property.getColumnPartition();
+ sb.append("type=ColumnPartition, alg=");
+ if (columnPartition.getAlgorithm() == ColumnPartitionAlgorithm.SORT_PARTITION) {
+ sb.append("sort");
+ } else {
+ sb.append("hash");
+ }
+ break;
+ case JOIN:
+ JoinEnforce join = property.getJoin();
+ sb.append("type=Join,alg=");
+ if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.MERGE_JOIN) {
+ sb.append("merge_join");
+ } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.NESTED_LOOP_JOIN) {
+ sb.append("nested_loop");
+ } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN) {
+ sb.append("block_nested_loop");
+ } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.IN_MEMORY_HASH_JOIN) {
+ sb.append("in_memory_hash");
+ }
+ break;
+ case OUTPUT_DISTINCT:
+ case SORT:
+ SortEnforce sort = property.getSort();
+ sb.append("type=Sort,alg=");
+ if (sort.getAlgorithm() == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+ sb.append("in-memory");
+ } else {
+ sb.append("external");
+ }
+ break;
+ case SORTED_INPUT:
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
new file mode 100644
index 0000000..b3b5bb0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class DataChannel {
+ private ExecutionBlockId srcId;
+ private ExecutionBlockId targetId;
+ private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
+ private ShuffleType shuffleType;
+ private Integer numOutputs = 1;
+ private Column[] shuffleKeys;
+
+ private Schema schema;
+
+ private StoreType storeType = StoreType.RAW;
+
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+ this.srcId = srcId;
+ this.targetId = targetId;
+ }
+
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType) {
+ this(srcId, targetId);
+ this.shuffleType = shuffleType;
+ }
+
+ public DataChannel(ExecutionBlock src, ExecutionBlock target, ShuffleType shuffleType, int numOutput) {
+ this(src.getId(), target.getId(), shuffleType, numOutput);
+ setSchema(src.getPlan().getOutSchema());
+ }
+
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType, int numOutputs) {
+ this(srcId, targetId, shuffleType);
+ this.numOutputs = numOutputs;
+ }
+
+ public DataChannel(DataChannelProto proto) {
+ this.srcId = new ExecutionBlockId(proto.getSrcId());
+ this.targetId = new ExecutionBlockId(proto.getTargetId());
+ this.transmitType = proto.getTransmitType();
+ this.shuffleType = proto.getShuffleType();
+ if (proto.hasSchema()) {
+ this.setSchema(new Schema(proto.getSchema()));
+ }
+ if (proto.getShuffleKeysCount() > 0) {
+ shuffleKeys = new Column[proto.getShuffleKeysCount()];
+ for (int i = 0; i < proto.getShuffleKeysCount(); i++) {
+ shuffleKeys[i] = new Column(proto.getShuffleKeys(i));
+ }
+ } else {
+ shuffleKeys = new Column[] {};
+ }
+ if (proto.hasNumOutputs()) {
+ this.numOutputs = proto.getNumOutputs();
+ }
+
+ if (proto.hasStoreType()) {
+ this.storeType = proto.getStoreType();
+ }
+ }
+
+ public ExecutionBlockId getSrcId() {
+ return srcId;
+ }
+
+ public ExecutionBlockId getTargetId() {
+ return targetId;
+ }
+
+ public ShuffleType getShuffleType() {
+ return shuffleType;
+ }
+
+ public TransmitType getTransmitType() {
+ return this.transmitType;
+ }
+
+ public void setTransmitType(TransmitType transmitType) {
+ this.transmitType = transmitType;
+ }
+
+ public void setShuffle(ShuffleType shuffleType, Column[] keys, int numOutputs) {
+ Preconditions.checkArgument(keys.length >= 0, "At least one shuffle key must be specified.");
+ Preconditions.checkArgument(numOutputs > 0, "The number of outputs must be positive: %s", numOutputs);
+
+ this.shuffleType = shuffleType;
+ this.shuffleKeys = keys;
+ this.numOutputs = numOutputs;
+ }
+
+ public void setShuffleType(ShuffleType shuffleType) {
+ this.shuffleType = shuffleType;
+ }
+
+ public boolean hasShuffleKeys() {
+ return shuffleKeys != null;
+ }
+
+ public void setShuffleKeys(Column[] key) {
+ this.shuffleKeys = key;
+ }
+
+ public Column [] getShuffleKeys() {
+ return this.shuffleKeys;
+ }
+
+ public void setShuffleOutputNum(int partNum) {
+ this.numOutputs = partNum;
+ }
+
+ public int getShuffleOutputNum() {
+ return numOutputs;
+ }
+
+ public boolean hasStoreType() {
+ return this.storeType != null;
+ }
+
+ public void setStoreType(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ public StoreType getStoreType() {
+ return storeType;
+ }
+
+ public DataChannelProto getProto() {
+ DataChannelProto.Builder builder = DataChannelProto.newBuilder();
+ builder.setSrcId(srcId.getProto());
+ builder.setTargetId(targetId.getProto());
+ if (transmitType != null) {
+ builder.setTransmitType(transmitType);
+ }
+ builder.setShuffleType(shuffleType);
+ if (schema != null) {
+ builder.setSchema(schema.getProto());
+ }
+ if (shuffleKeys != null) {
+ for (Column column : shuffleKeys) {
+ builder.addShuffleKeys(column.getProto());
+ }
+ }
+ if (numOutputs != null) {
+ builder.setNumOutputs(numOutputs);
+ }
+
+ if(storeType != null){
+ builder.setStoreType(storeType);
+ }
+ return builder.build();
+ }
+
+ public void setSchema(Schema schema) {
+ this.schema = SchemaUtil.clone(schema);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append(srcId.getQueryId()).append("] ");
+ sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+ sb.append(" (type=").append(shuffleType);
+ if (hasShuffleKeys()) {
+ sb.append(", key=");
+ sb.append(TUtil.arrayToString(shuffleKeys));
+ sb.append(", num=").append(numOutputs);
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
new file mode 100644
index 0000000..7df6b43
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
+ * An ExecutionBlock class contains input information (e.g., child execution blocks or input
+ * tables), and output information (e.g., partition type, partition key, and partition number).
+ * In addition, it includes a logical plan to be executed in each node.
+ */
+public class ExecutionBlock {
+ private ExecutionBlockId executionBlockId;
+ private LogicalNode plan = null;
+ private StoreTableNode store = null;
+ private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+ private Enforcer enforcer = new Enforcer();
+
+ private boolean hasJoinPlan;
+ private boolean hasUnionPlan;
+
+ private Set<String> broadcasted = new HashSet<String>();
+
+ public ExecutionBlock(ExecutionBlockId executionBlockId) {
+ this.executionBlockId = executionBlockId;
+ }
+
+ public ExecutionBlockId getId() {
+ return executionBlockId;
+ }
+
+ public void setPlan(LogicalNode plan) {
+ hasJoinPlan = false;
+ hasUnionPlan = false;
+ this.scanlist.clear();
+ this.plan = plan;
+
+ if (plan == null) {
+ return;
+ }
+
+ LogicalNode node = plan;
+ ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+ s.add(node);
+ while (!s.isEmpty()) {
+ node = s.remove(s.size()-1);
+ if (node instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) node;
+ s.add(s.size(), unary.getChild());
+ } else if (node instanceof BinaryNode) {
+ BinaryNode binary = (BinaryNode) node;
+ if (binary.getType() == NodeType.JOIN) {
+ hasJoinPlan = true;
+ } else if (binary.getType() == NodeType.UNION) {
+ hasUnionPlan = true;
+ }
+ s.add(s.size(), binary.getLeftChild());
+ s.add(s.size(), binary.getRightChild());
+ } else if (node instanceof ScanNode) {
+ scanlist.add((ScanNode)node);
+ } else if (node instanceof TableSubQueryNode) {
+ TableSubQueryNode subQuery = (TableSubQueryNode) node;
+ s.add(s.size(), subQuery.getSubQuery());
+ }
+ }
+ }
+
+
+ public LogicalNode getPlan() {
+ return plan;
+ }
+
+ public Enforcer getEnforcer() {
+ return enforcer;
+ }
+
+ public StoreTableNode getStoreTableNode() {
+ return store;
+ }
+
+ public ScanNode [] getScanNodes() {
+ return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+ }
+
+ public boolean hasJoin() {
+ return hasJoinPlan;
+ }
+
+ public boolean hasUnion() {
+ return hasUnionPlan;
+ }
+
+ public void addBroadcastTable(String tableName) {
+ broadcasted.add(tableName);
+ enforcer.addBroadcast(tableName);
+ }
+
+ public boolean isBroadcastTable(String tableName) {
+ return broadcasted.contains(tableName);
+ }
+
+ public Collection<String> getBroadcastTables() {
+ return broadcasted;
+ }
+
+ public String toString() {
+ return executionBlockId.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
new file mode 100644
index 0000000..d4ab068
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import java.util.ArrayList;
+import java.util.Stack;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * This class is a pointer to an ExecutionBlock that the query engine should execute.
+ * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
+ */
+public class ExecutionBlockCursor {
+ private MasterPlan masterPlan;
+ private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
+ private int cursor = 0;
+
+ public ExecutionBlockCursor(MasterPlan plan) {
+ this.masterPlan = plan;
+ buildOrder(plan.getRoot());
+ }
+
+ public int size() {
+ return orderedBlocks.size();
+ }
+
+ // Add all execution blocks in a depth first and postfix order
+ private void buildOrder(ExecutionBlock current) {
+ Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
+ if (!masterPlan.isLeaf(current.getId())) {
+ for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
+ if (!masterPlan.isLeaf(execBlock)) {
+ buildOrder(execBlock);
+ } else {
+ stack.push(execBlock);
+ }
+ }
+ for (ExecutionBlock execBlock : stack) {
+ buildOrder(execBlock);
+ }
+ }
+ orderedBlocks.add(current);
+ }
+
+ public boolean hasNext() {
+ return cursor < orderedBlocks.size();
+ }
+
+ public ExecutionBlock nextBlock() {
+ return orderedBlocks.get(cursor++);
+ }
+
+ public ExecutionBlock peek() {
+ return orderedBlocks.get(cursor);
+ }
+
+ public ExecutionBlock peek(int skip) {
+ return orderedBlocks.get(cursor + skip);
+ }
+
+ public void reset() {
+ cursor = 0;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < orderedBlocks.size(); i++) {
+ if (i == (cursor == 0 ? 0 : cursor - 1)) {
+ sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")");
+ } else {
+ sb.append(orderedBlocks.get(i).getId().getId());
+ }
+
+ if (i < orderedBlocks.size() - 1) {
+ sb.append(",");
+ }
+ }
+
+ return sb.toString();
+ }
+}