You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/07 10:26:47 UTC
[2/2] git commit: TAJO-232: Rename join operators and add other join
operators to PhysicalPlanner. (hyunsik)
TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d7645252
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d7645252
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d7645252
Branch: refs/heads/master
Commit: d7645252fb6bc3152d9774c6a655b8c083ff21b6
Parents: 1023c82
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 7 17:02:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Oct 7 17:03:07 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tajo/algebra/BinaryOperator.java | 8 +-
.../apache/tajo/algebra/ExistsPredicate.java | 44 +++
.../java/org/apache/tajo/algebra/OpType.java | 4 +-
.../org/apache/tajo/algebra/RelationList.java | 4 +-
.../tajo/algebra/SimpleTableSubQuery.java | 41 +++
.../tajo/algebra/TablePrimarySubQuery.java | 54 +++
.../org/apache/tajo/algebra/TableSubQuery.java | 54 ---
.../org/apache/tajo/catalog/TestCatalog.java | 2 +-
.../org/apache/tajo/engine/parser/SQLParser.g4 | 3 +-
.../tajo/engine/parser/HiveConverter.java | 2 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 23 +-
.../tajo/engine/planner/AlgebraVisitor.java | 2 +-
.../tajo/engine/planner/BaseAlgebraVisitor.java | 6 +-
.../tajo/engine/planner/LogicalPlanner.java | 2 +-
.../engine/planner/PhysicalPlannerImpl.java | 288 +++++++++++-----
.../planner/logical/TableSubQueryNode.java | 2 +-
.../physical/BasicPhysicalExecutorVisitor.java | 12 +-
.../planner/physical/FullOuterHashJoinExec.java | 252 --------------
.../physical/FullOuterMergeJoinExec.java | 334 -------------------
.../planner/physical/HashAntiJoinExec.java | 105 ------
.../planner/physical/HashFullOuterJoinExec.java | 252 ++++++++++++++
.../planner/physical/HashLeftAntiJoinExec.java | 110 ++++++
.../planner/physical/HashLeftOuterJoinExec.java | 214 ++++++++++++
.../planner/physical/HashLeftSemiJoinExec.java | 107 ++++++
.../planner/physical/HashSemiJoinExec.java | 101 ------
.../planner/physical/LeftOuterHashJoinExec.java | 214 ------------
.../planner/physical/LeftOuterNLJoinExec.java | 129 -------
.../physical/MergeFullOuterJoinExec.java | 334 +++++++++++++++++++
.../planner/physical/NLLeftOuterJoinExec.java | 129 +++++++
.../physical/PhysicalExecutorVisitor.java | 6 +-
.../tajo/engine/parser/TestSQLAnalyzer.java | 24 ++
.../physical/TestFullOuterHashJoinExec.java | 9 +-
.../physical/TestFullOuterMergeJoinExec.java | 14 +-
.../planner/physical/TestHashAntiJoinExec.java | 8 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../physical/TestLeftOuterHashJoinExec.java | 11 +-
.../physical/TestLeftOuterNLJoinExec.java | 40 +--
.../physical/TestRightOuterHashJoinExec.java | 2 +-
.../src/test/queries/exists_predicate_1.sql | 1 +
.../src/test/queries/exists_predicate_2.sql | 1 +
.../src/test/queries/in_subquery_1.sql | 1 +
.../src/test/queries/in_subquery_2.sql | 1 +
43 files changed, 1600 insertions(+), 1361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f990530..14f935a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,9 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-232: Rename join operators and add other join operators to
+ PhysicalPlanner. (hyunsik)
+
TAJO-229: Implement JoinGraph to represent a graph of relation joins.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
index 3e00e5e..a090623 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
@@ -32,16 +32,16 @@ public class BinaryOperator extends Expr {
this.right = right;
}
- public Expr getLeft() {
- return this.left;
+ public <T extends Expr> T getLeft() {
+ return (T) this.left;
}
public void setLeft(Expr left) {
this.left = left;
}
- public Expr getRight() {
- return this.right;
+ public <T extends Expr> T getRight() {
+ return (T) this.right;
}
public void setRight(Expr right) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
new file mode 100644
index 0000000..0823807
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class ExistsPredicate extends Expr {
+ private SimpleTableSubQuery simpleTableSubQuery;
+ private boolean not;
+
+ public ExistsPredicate(SimpleTableSubQuery simpleTableSubQuery, boolean not) {
+ super(OpType.InPredicate);
+ this.simpleTableSubQuery = simpleTableSubQuery;
+ this.not = not;
+ }
+
+ public boolean isNot() {
+ return this.not;
+ }
+
+ public SimpleTableSubQuery getSubQuery() {
+ return simpleTableSubQuery;
+ }
+
+ @Override
+ boolean equalsTo(Expr expr) {
+ ExistsPredicate another = (ExistsPredicate) expr;
+ return not == another.not && simpleTableSubQuery.equals(another.simpleTableSubQuery);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
index 145c52c..799b196 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
@@ -28,7 +28,8 @@ public enum OpType {
Relation(Relation.class),
RelationList(RelationList.class),
Rename,
- TableSubQuery(TableSubQuery.class),
+ SimpleTableSubQuery(SimpleTableSubQuery.class),
+ TablePrimaryTableSubQuery(TablePrimarySubQuery.class),
Except(SetOperation.class),
Having(Having.class),
Aggregation(Aggregation.class),
@@ -66,6 +67,7 @@ public enum OpType {
InPredicate(InPredicate.class),
ValueList(ValueListExpr.class),
Is,
+ ExistsPredicate(ExistsPredicate.class),
// string operator or pattern matching predicates
LikePredicate(PatternMatchPredicate.class),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
index 8e9f878..918ff46 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
@@ -33,8 +33,8 @@ public class RelationList extends Expr {
Preconditions.checkArgument(
rel.getType() == OpType.Relation ||
rel.getType() == OpType.Join ||
- rel.getType() == OpType.TableSubQuery,
- "Only Relation, Join, or TableSubQuery can be given to RelationList, but this expr "
+ rel.getType() == OpType.TablePrimaryTableSubQuery,
+ "Only Relation, Join, or TablePrimarySubQuery can be given to RelationList, but this expr "
+ " is " + rel.getType());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
new file mode 100644
index 0000000..91442dc
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class SimpleTableSubQuery extends Expr {
+ private Expr subquery;
+
+ public SimpleTableSubQuery(Expr subquery) {
+ super(OpType.SimpleTableSubQuery);
+ this.subquery = subquery;
+ }
+
+ public Expr getSubQuery() {
+ return subquery;
+ }
+
+ @Override
+ boolean equalsTo(Expr expr) {
+ return subquery.equals(subquery);
+ }
+
+ public String toJson() {
+ return JsonHelper.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
new file mode 100644
index 0000000..929e9b2
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class TablePrimarySubQuery extends Relation {
+ private Expr subquery;
+ private String [] columnNames;
+
+ public TablePrimarySubQuery(String relName, Expr subquery) {
+ super(OpType.TablePrimaryTableSubQuery, relName);
+ this.subquery = subquery;
+ }
+
+ public boolean hasColumnNames() {
+ return this.columnNames != null;
+ }
+
+ public void setColumnNames(String[] aliasList) {
+ this.columnNames = aliasList;
+ }
+
+ public String [] getColumnNames() {
+ return columnNames;
+ }
+
+ public Expr getSubQuery() {
+ return subquery;
+ }
+
+ @Override
+ boolean equalsTo(Expr expr) {
+ return subquery.equals(subquery);
+ }
+
+ public String toJson() {
+ return JsonHelper.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
deleted file mode 100644
index 55c44ba..0000000
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.algebra;
-
-public class TableSubQuery extends Relation {
- private Expr subquery;
- private String [] columnNames;
-
- public TableSubQuery(String relName, Expr subquery) {
- super(OpType.TableSubQuery, relName);
- this.subquery = subquery;
- }
-
- public boolean hasColumnNames() {
- return this.columnNames != null;
- }
-
- public void setColumnNames(String[] aliasList) {
- this.columnNames = aliasList;
- }
-
- public String [] getColumnNames() {
- return columnNames;
- }
-
- public Expr getSubQuery() {
- return subquery;
- }
-
- @Override
- boolean equalsTo(Expr expr) {
- return subquery.equals(subquery);
- }
-
- public String toJson() {
- return JsonHelper.toJson(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index fd1893b..97824b2 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -48,7 +48,7 @@ public class TestCatalog {
public static void setUp() throws Exception {
TajoConf conf = new TajoConf();
- conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db");
+ conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db;create=true");
conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "127.0.0.1:0");
server = new CatalogServer();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 6fc8166..1c6816a 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -831,6 +831,7 @@ predicate
| in_predicate
| pattern_matching_predicate
| null_predicate
+ | exists_predicate
;
/*
@@ -944,7 +945,7 @@ some : SOME | ANY;
*/
exists_predicate
- : EXISTS s=table_subquery
+ : NOT? EXISTS s=table_subquery
;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
index 0442e77..135d146 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
@@ -501,7 +501,7 @@ public class HiveConverter extends HiveParserBaseVisitor<Expr>{
}
}
- TableSubQuery subQuery = new TableSubQuery(tableAlias, current);
+ TablePrimarySubQuery subQuery = new TablePrimarySubQuery(tableAlias, current);
current = subQuery;
}
// TODO: implement lateralView
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 021e0ba..49494a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -400,7 +400,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
return relation;
} else if (ctx.derived_table() != null) {
- return new TableSubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
+ return new TablePrimarySubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
} else {
return null;
}
@@ -628,13 +628,17 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
@Override
- public ValueListExpr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
- int size = ctx.in_value_list().row_value_expression().size();
- Expr [] exprs = new Expr[size];
- for (int i = 0; i < size; i++) {
- exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+ public Expr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
+ if (checkIfExist(ctx.in_value_list())) {
+ int size = ctx.in_value_list().row_value_expression().size();
+ Expr [] exprs = new Expr[size];
+ for (int i = 0; i < size; i++) {
+ exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+ }
+ return new ValueListExpr(exprs);
+ } else {
+ return new SimpleTableSubQuery(visitChildren(ctx.table_subquery()));
}
- return new ValueListExpr(exprs);
}
@Override
@@ -692,6 +696,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
@Override
+ public ExistsPredicate visitExists_predicate(SQLParser.Exists_predicateContext ctx) {
+ return new ExistsPredicate(new SimpleTableSubQuery(visitTable_subquery(ctx.table_subquery())), ctx.NOT() != null);
+ }
+
+ @Override
public ColumnReferenceExpr visitColumn_reference(SQLParser.Column_referenceContext ctx) {
ColumnReferenceExpr column = new ColumnReferenceExpr(ctx.name.getText());
if (ctx.tb_name != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
index 633609a..345320c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
@@ -34,7 +34,7 @@ public interface AlgebraVisitor<CONTEXT, RESULT> {
RESULT visitExcept(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
RESULT visitIntersect(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
RESULT visitRelationList(CONTEXT ctx, Stack<OpType> stack, RelationList expr) throws PlanningException;
- RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException;
+ RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException;
RESULT visitRelation(CONTEXT ctx, Stack<OpType> stack, Relation expr) throws PlanningException;
RESULT visitCreateTable(CONTEXT ctx, Stack<OpType> stack, CreateTable expr) throws PlanningException;
RESULT visitDropTable(CONTEXT ctx, Stack<OpType> stack, DropTable expr) throws PlanningException;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 06ab38c..f974e1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -82,8 +82,8 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
case RelationList:
current = visitRelationList(ctx, stack, (RelationList) expr);
break;
- case TableSubQuery:
- current = visitTableSubQuery(ctx, stack, (TableSubQuery) expr);
+ case TablePrimaryTableSubQuery:
+ current = visitTableSubQuery(ctx, stack, (TablePrimarySubQuery) expr);
break;
case Relation:
current = visitRelation(ctx, stack, (Relation) expr);
@@ -209,7 +209,7 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
}
@Override
- public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+ public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException {
stack.push(expr.getType());
RESULT child = visitChild(ctx, stack, expr.getSubQuery());
stack.pop();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 359dda3..d083ac1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -145,7 +145,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
}
- public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr)
+ public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TablePrimarySubQuery expr)
throws PlanningException {
QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
PlanContext newContext = new PlanContext(context.plan, newBlock);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c6e4695..14514a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -211,34 +211,150 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_SEMI:
+ return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+
case RIGHT_SEMI:
+ return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_ANTI:
+ return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+
case RIGHT_ANTI:
+ return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
default:
throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
}
}
- private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+ switch (algorithm) {
+ case NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, plan, leftExec, rightExec);
+ case BLOCK_NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ default:
+ // fallback algorithm
+ LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ }
+
+ } else {
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+ switch (algorithm) {
+ case NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, plan, leftExec, rightExec);
+ case BLOCK_NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+ return new HashJoinExec(context, plan, leftExec, rightExec);
+ case MERGE_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+ return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ case HYBRID_HASH_JOIN:
+
+ default:
+ LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ }
+
+
+ } else {
+ return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ long leftSize = estimateSizeRecursive(context, leftLineage);
+ long rightSize = estimateSizeRecursive(context, rightLineage);
+
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
+
+ boolean hashJoin = false;
+ if (leftSize < threshold || rightSize < threshold) {
+ hashJoin = true;
+ }
+
+ if (hashJoin) {
+ PhysicalExec selectedOuter;
+ PhysicalExec selectedInner;
+
+ // HashJoinExec loads the inner relation to memory.
+ if (leftSize <= rightSize) {
+ selectedInner = leftExec;
+ selectedOuter = rightExec;
+ } else {
+ selectedInner = rightExec;
+ selectedOuter = leftExec;
+ }
+
+ LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
+ return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+ } else {
+ return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+ plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+ ExternalSortExec outerSort = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
+ leftExec);
+ ExternalSortExec innerSort = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
+ rightExec);
+
+ LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+ return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+ }
+
+ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
case NESTED_LOOP_JOIN:
//the right operand is too large, so we opt for NL implementation of left outer join
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+ return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
} else {
return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
@@ -253,12 +369,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
if (rightTableVolume < threshold) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
else {
//the right operand is too large, so we opt for NL implementation of left outer join
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+ return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
}
@@ -270,7 +386,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
long outerSize4 = estimateSizeRecursive(context, outerLineage4);
if (outerSize4 < threshold){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
@@ -290,7 +406,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
@@ -298,7 +414,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
case MERGE_JOIN:
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
default:
@@ -312,7 +428,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
@@ -334,7 +450,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
@@ -354,10 +470,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedRight = leftExec;
}
LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
- return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+ return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
}
- private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
// if size too large, full outer merge join implementation
@@ -369,7 +485,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
- return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+ return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -386,114 +502,104 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ /**
+ * Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
+ */
+ private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
switch (algorithm) {
- case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
- case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+
default:
- // fallback algorithm
- LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+ return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
-
} else {
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
}
}
- private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ /**
+ * Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+ */
+ private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
switch (algorithm) {
- case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
- case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
case IN_MEMORY_HASH_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
- return new HashJoinExec(context, plan, leftExec, rightExec);
- case MERGE_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
- return createMergeJoin(context, plan, leftExec, rightExec);
- case HYBRID_HASH_JOIN:
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
default:
- LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
- LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createMergeJoin(context, plan, leftExec, rightExec);
+ LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+ return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
}
-
-
} else {
- return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
}
}
- private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
- long leftSize = estimateSizeRecursive(context, leftLineage);
- long rightSize = estimateSizeRecursive(context, rightLineage);
-
- final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
-
- boolean hashJoin = false;
- if (leftSize < threshold || rightSize < threshold) {
- hashJoin = true;
- }
-
- if (hashJoin) {
- PhysicalExec selectedOuter;
- PhysicalExec selectedInner;
+ /**
+ * Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+ */
+ private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+ switch (algorithm) {
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
- // HashJoinExec loads the inner relation to memory.
- if (leftSize <= rightSize) {
- selectedInner = leftExec;
- selectedOuter = rightExec;
- } else {
- selectedInner = rightExec;
- selectedOuter = leftExec;
+ default:
+ LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+ return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
}
-
- LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
- return new HashJoinExec(context, plan, selectedOuter, selectedInner);
} else {
- return createMergeJoin(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
}
}
- private MergeJoinExec createMergeJoin(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
- plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
- ExternalSortExec outerSort = new ExternalSortExec(context, sm,
- new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
- leftExec);
- ExternalSortExec innerSort = new ExternalSortExec(context, sm,
- new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
- rightExec);
+ /**
+ * Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+ */
+ private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+ switch (algorithm) {
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
- LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
- return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+ default:
+ LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+ return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ }
+ } else {
+ LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ }
}
public PhysicalExec createStorePlan(TaskAttemptContext ctx,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index 39ae1e2..846ed41 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -79,7 +79,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
@Override
public PlanString getPlanString() {
- PlanString planStr = new PlanString("TableSubQuery");
+ PlanString planStr = new PlanString("TablePrimarySubQuery");
planStr.appendTitle(" as ").appendTitle(tableName);
return planStr;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index fc569d7..4723ecc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -38,10 +38,10 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
return visitProjection((ProjectionExec) exec, stack, context);
} else if (exec instanceof HashJoinExec) {
return visitHashJoin((HashJoinExec) exec, stack, context);
- } else if (exec instanceof HashAntiJoinExec) {
- return visitHashAntiJoin((HashAntiJoinExec) exec, stack, context);
- } else if (exec instanceof HashSemiJoinExec) {
- return visitHashSemiJoin((HashSemiJoinExec) exec, stack, context);
+ } else if (exec instanceof HashLeftAntiJoinExec) {
+ return visitHashAntiJoin((HashLeftAntiJoinExec) exec, stack, context);
+ } else if (exec instanceof HashLeftSemiJoinExec) {
+ return visitHashSemiJoin((HashLeftSemiJoinExec) exec, stack, context);
} else if (exec instanceof LimitExec) {
return visitLimit((LimitExec) exec, stack, context);
} else {
@@ -108,13 +108,13 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
}
@Override
- public RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
throws PhysicalPlanningException {
return visitBinaryExecutor(exec, stack, context);
}
@Override
- public RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ public RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
throws PhysicalPlanningException {
return visitBinaryExecutor(exec, stack, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
deleted file mode 100644
index aebfdb5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.*;
-
-
-public class FullOuterHashJoinExec extends BinaryPhysicalExec {
- // from logical plan
- protected JoinNode plan;
- protected EvalNode joinQual;
-
- protected List<Column[]> joinKeyPairs;
-
- // temporal tuples and states for nested loop join
- protected boolean first = true;
- protected FrameTuple frameTuple;
- protected Tuple outTuple = null;
- protected Map<Tuple, List<Tuple>> tupleSlots;
- protected Iterator<Tuple> iterator = null;
- protected EvalContext qualCtx;
- protected Tuple leftTuple;
- protected Tuple leftKeyTuple;
-
- protected int [] leftKeyList;
- protected int [] rightKeyList;
-
- protected boolean finished = false;
- protected boolean shouldGetLeftTuple = true;
-
- // projection
- protected final Projector projector;
- protected final EvalContext [] evalContexts;
-
- private int rightNumCols;
- private int leftNumCols;
- private Map<Tuple, Boolean> matched;
-
- public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
- PhysicalExec inner) {
- super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
- plan.getOutSchema(), outer, inner);
- this.plan = plan;
- this.joinQual = plan.getJoinQual();
- this.qualCtx = joinQual.newContext();
- this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
- // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
- // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
- this.matched = new HashMap<Tuple, Boolean>(10000);
-
- this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
- outer.getSchema(), inner.getSchema());
-
- leftKeyList = new int[joinKeyPairs.size()];
- rightKeyList = new int[joinKeyPairs.size()];
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
- }
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
- }
-
- // for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
-
- // for join
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.getColumnNum());
- leftKeyTuple = new VTuple(leftKeyList.length);
-
- leftNumCols = outer.getSchema().getColumnNum();
- rightNumCols = inner.getSchema().getColumnNum();
- }
-
- protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
- for (int i = 0; i < leftKeyList.length; i++) {
- keyTuple.put(i, outerTuple.get(leftKeyList[i]));
- }
- }
-
- public Tuple getNextUnmatchedRight() {
-
- List<Tuple> newValue;
- Tuple returnedTuple;
- // get a keyTUple from the matched hashmap with a boolean false value
- for(Tuple aKeyTuple : matched.keySet()) {
- if(matched.get(aKeyTuple) == false) {
- newValue = tupleSlots.get(aKeyTuple);
- returnedTuple = newValue.remove(0);
- tupleSlots.put(aKeyTuple, newValue);
-
- // after taking the last element from the list in tupleSlots, set flag true in matched as well
- if(newValue.isEmpty()){
- matched.put(aKeyTuple, true);
- }
-
- return returnedTuple;
- }
- }
- return null;
- }
-
- public Tuple next() throws IOException {
- if (first) {
- loadRightToHashTable();
- }
-
- Tuple rightTuple;
- boolean found = false;
-
- while(!finished) {
- if (shouldGetLeftTuple) { // initially, it is true.
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
- Tuple unmatchedRightTuple = getNextUnmatchedRight();
- if( unmatchedRightTuple == null) {
- finished = true;
- outTuple = null;
- return null;
- } else {
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
- frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
-
- return outTuple;
- }
- }
-
- // getting corresponding right
- getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
- if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
- iterator = tupleSlots.get(leftKeyTuple).iterator();
- shouldGetLeftTuple = false;
- } else {
- //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
- //output a tuple with the nulls padded rightTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- shouldGetLeftTuple = true;
- return outTuple;
- }
- }
-
- // getting a next right tuple on in-memory hash table.
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
- if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- found = true;
- getKeyLeftTuple(leftTuple, leftKeyTuple);
- matched.put(leftKeyTuple, true);
- }
-
- if (!iterator.hasNext()) { // no more right tuples for this hash key
- shouldGetLeftTuple = true;
- }
-
- if (found) {
- break;
- }
- }
- return outTuple;
- }
-
- protected void loadRightToHashTable() throws IOException {
- Tuple tuple;
- Tuple keyTuple;
-
- while ((tuple = rightChild.next()) != null) {
- keyTuple = new VTuple(joinKeyPairs.size());
- List<Tuple> newValue;
- for (int i = 0; i < rightKeyList.length; i++) {
- keyTuple.put(i, tuple.get(rightKeyList[i]));
- }
-
- if (tupleSlots.containsKey(keyTuple)) {
- newValue = tupleSlots.get(keyTuple);
- newValue.add(tuple);
- tupleSlots.put(keyTuple, newValue);
- } else {
- newValue = new ArrayList<Tuple>();
- newValue.add(tuple);
- tupleSlots.put(keyTuple, newValue);
- matched.put(keyTuple,false);
- }
- }
- first = false;
- }
-
- @Override
- public void rescan() throws IOException {
- super.rescan();
-
- tupleSlots.clear();
- first = true;
-
- finished = false;
- iterator = null;
- shouldGetLeftTuple = true;
- }
-
- public void close() throws IOException {
- tupleSlots.clear();
- }
-
- public JoinNode getPlan() {
- return this.plan;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
deleted file mode 100644
index 2c11fea..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
- // from logical plan
- private JoinNode joinNode;
- private EvalNode joinQual;
- private EvalContext qualCtx;
-
- // temporal tuples and states for nested loop join
- private FrameTuple frameTuple;
- private Tuple leftTuple = null;
- private Tuple rightTuple = null;
- private Tuple outTuple = null;
- private Tuple leftNext = null;
-
- private final List<Tuple> leftTupleSlots;
- private final List<Tuple> rightTupleSlots;
-
- private JoinTupleComparator joincomparator = null;
- private TupleComparator[] tupleComparator = null;
-
- private final static int INITIAL_TUPLE_SLOT = 10000;
-
- private boolean end = false;
-
- // projection
- private final Projector projector;
- private final EvalContext [] evalContexts;
-
- private int rightNumCols;
- private int leftNumCols;
- private int posRightTupleSlots = -1;
- private int posLeftTupleSlots = -1;
- boolean endInPopulationStage = false;
- private boolean initRightDone = false;
-
- public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
- PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
- super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
- Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
- "but there is no join condition");
- this.joinNode = plan;
- this.joinQual = plan.getJoinQual();
- this.qualCtx = this.joinQual.newContext();
-
- this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
- this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
- SortSpec[][] sortSpecs = new SortSpec[2][];
- sortSpecs[0] = leftSortKey;
- sortSpecs[1] = rightSortKey;
-
- this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
- rightChild.getSchema(), sortSpecs);
- this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
- plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
-
- // for projection
- this.projector = new Projector(inSchema, outSchema, plan.getTargets());
- this.evalContexts = projector.renew();
-
- // for join
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.getColumnNum());
-
- leftNumCols = leftChild.getSchema().getColumnNum();
- rightNumCols = rightChild.getSchema().getColumnNum();
- }
-
- public JoinNode getPlan(){
- return this.joinNode;
- }
-
- public Tuple next() throws IOException {
- Tuple previous;
-
- for (;;) {
- boolean newRound = false;
- if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
- newRound = true;
- }
- if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
- newRound = true;
- }
-
- if(newRound == true){
-
- if (end) {
-
- ////////////////////////////////////////////////////////////////////////
- // FINALIZING STAGE
- ////////////////////////////////////////////////////////////////////////
- // the finalizing stage, where remaining tuples on the right are
- // transformed into left-padded results while tuples on the left
- // are transformed into right-padded results
-
- // before exit, a left-padded tuple should be built for all remaining
- // right side and a right-padded tuple should be built for all remaining
- // left side
-
- if (initRightDone == false) {
- // maybe the left operand was empty => the right one didn't have the chance to initialize
- rightTuple = rightChild.next();
- initRightDone = true;
- }
-
- if((leftTuple == null) && (rightTuple == null)) {
- return null;
- }
-
- if((leftTuple == null) && (rightTuple != null)){
- // output a tuple with the nulls padded leftTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
- frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- rightTuple = rightChild.next();
- return outTuple;
- }
-
- if((leftTuple != null) && (rightTuple == null)){
- // output a tuple with the nulls padded leftTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- leftTuple = leftChild.next();
- return outTuple;
- }
- } // if end
-
- ////////////////////////////////////////////////////////////////////////
- // INITIALIZING STAGE
- ////////////////////////////////////////////////////////////////////////
- // initializing stage, reading the first tuple on each side
- if (leftTuple == null) {
- leftTuple = leftChild.next();
- if( leftTuple == null){
- end = true;
- continue;
- }
- }
- if (rightTuple == null) {
- rightTuple = rightChild.next();
- initRightDone = true;
- if (rightTuple == null) {
- end = true;
- continue;
- }
- }
-
- // reset tuple slots for a new round
- leftTupleSlots.clear();
- rightTupleSlots.clear();
- posRightTupleSlots = -1;
- posLeftTupleSlots = -1;
-
- ////////////////////////////////////////////////////////////////////////
- // Comparison and Move Forward Stage
- ////////////////////////////////////////////////////////////////////////
- // advance alternatively on each side until a match is found
- int cmp;
- while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
-
- if (cmp > 0) {
-
- //before getting a new tuple from the right, a leftnullpadded tuple should be built
- //output a tuple with the nulls padded leftTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
- frameTuple.set(nullPaddedTuple, rightTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- // BEFORE RETURN, MOVE FORWARD
- rightTuple = rightChild.next();
- if(rightTuple == null) {
- end = true;
- }
-
- return outTuple;
-
- } else if (cmp < 0) {
- // before getting a new tuple from the left, a rightnullpadded tuple should be built
- // output a tuple with the nulls padded rightTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- // BEFORE RETURN, MOVE FORWARD
- leftTuple = leftChild.next();
- if(leftTuple == null) {
- end = true;
- }
-
- return outTuple;
-
- } // if (cmp < 0)
- } //while
-
-
- ////////////////////////////////////////////////////////////////////////
- // SLOTS POPULATION STAGE
- ////////////////////////////////////////////////////////////////////////
- // once a match is found, retain all tuples with this key in tuple slots
- // on each side
- if(!end) {
- endInPopulationStage = false;
-
- boolean endLeft = false;
- boolean endRight = false;
-
- previous = new VTuple(leftTuple);
- do {
- leftTupleSlots.add(new VTuple(leftTuple));
- leftTuple = leftChild.next();
- if(leftTuple == null) {
- endLeft = true;
- }
-
-
- } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
- posLeftTupleSlots = 0;
-
-
- previous = new VTuple(rightTuple);
- do {
- rightTupleSlots.add(new VTuple(rightTuple));
- rightTuple = rightChild.next();
- if(rightTuple == null) {
- endRight = true;
- }
-
- } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
- posRightTupleSlots = 0;
-
- if ((endLeft == true) || (endRight == true)) {
- end = true;
- endInPopulationStage = true;
- }
-
- } // if end false
- } // if newRound
-
-
- ////////////////////////////////////////////////////////////////////////
- // RESULTS STAGE
- ////////////////////////////////////////////////////////////////////////
- // now output result matching tuples from the slots
- // if either we haven't reached end on neither side, or we did reach end
- // on one(or both) sides but that happened in the slots population step
- // (i.e. refers to next round)
- if(!end || (end && endInPopulationStage)){
- if(posLeftTupleSlots == 0){
- leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
- posLeftTupleSlots = posLeftTupleSlots + 1;
- }
-
- if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
- Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
- posRightTupleSlots = posRightTupleSlots + 1;
- frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- return outTuple;
- } else {
- // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
- if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
- //rewind the right slots position
- posRightTupleSlots = 0;
- Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
- posRightTupleSlots = posRightTupleSlots + 1;
- leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
- posLeftTupleSlots = posLeftTupleSlots + 1;
-
- frameTuple.set(leftNext, aTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- return outTuple;
- }
- }
- } // the second if end false
- } // for
- }
-
-
- @Override
- public void rescan() throws IOException {
- super.rescan();
- leftTupleSlots.clear();
- rightTupleSlots.clear();
- posRightTupleSlots = -1;
- posLeftTupleSlots = -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
deleted file mode 100644
index 8f2d115..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-
-public class HashAntiJoinExec extends HashJoinExec {
- private Tuple rightNullTuple;
-
- public HashAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
- PhysicalExec right) {
- super(context, plan, left, right);
- // NUll Tuple
- rightNullTuple = new VTuple(leftChild.outColumnNum);
- for (int i = 0; i < leftChild.outColumnNum; i++) {
- rightNullTuple.put(i, NullDatum.get());
- }
- }
-
- /**
- * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
- * next() method finds the first unmatched tuple from both tables.
- *
- * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
- * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
- * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
- *
- * @return The tuple which is unmatched to a given join condition.
- * @throws IOException
- */
- public Tuple next() throws IOException {
- if (first) {
- loadRightToHashTable();
- }
-
- Tuple rightTuple;
- boolean notFound;
-
- while(!finished) {
-
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- finished = true;
- return null;
- }
-
- // Try to find a hash bucket in in-memory hash table
- getKeyLeftTuple(leftTuple, leftKeyTuple);
- if (tupleSlots.containsKey(leftKeyTuple)) {
- // if found, it gets a hash bucket from the hash table.
- iterator = tupleSlots.get(leftKeyTuple).iterator();
- } else {
- // if not found, it returns a tuple.
- frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- return outTuple;
- }
-
- // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
- // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
- notFound = true;
- while (notFound && iterator.hasNext()) {
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple);
- joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
- notFound = false;
- }
- }
-
- if (notFound) { // if there is no matched tuple
- frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(evalContexts, frameTuple);
- projector.terminate(evalContexts, outTuple);
- break;
- }
- }
-
- return outTuple;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
new file mode 100644
index 0000000..ac591d3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashFullOuterJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected EvalContext qualCtx;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected final Projector projector;
+ protected final EvalContext [] evalContexts;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private Map<Tuple, Boolean> matched;
+
+ public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner) {
+ super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+ plan.getOutSchema(), outer, inner);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = joinQual.newContext();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+ // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+ // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+ this.matched = new HashMap<Tuple, Boolean>(10000);
+
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+ outer.getSchema(), inner.getSchema());
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.evalContexts = projector.renew();
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+
+ leftNumCols = outer.getSchema().getColumnNum();
+ rightNumCols = inner.getSchema().getColumnNum();
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ public Tuple getNextUnmatchedRight() {
+
+ List<Tuple> newValue;
+ Tuple returnedTuple;
+ // get a keyTUple from the matched hashmap with a boolean false value
+ for(Tuple aKeyTuple : matched.keySet()) {
+ if(matched.get(aKeyTuple) == false) {
+ newValue = tupleSlots.get(aKeyTuple);
+ returnedTuple = newValue.remove(0);
+ tupleSlots.put(aKeyTuple, newValue);
+
+ // after taking the last element from the list in tupleSlots, set flag true in matched as well
+ if(newValue.isEmpty()){
+ matched.put(aKeyTuple, true);
+ }
+
+ return returnedTuple;
+ }
+ }
+ return null;
+ }
+
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+ Tuple unmatchedRightTuple = getNextUnmatchedRight();
+ if( unmatchedRightTuple == null) {
+ finished = true;
+ outTuple = null;
+ return null;
+ } else {
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+
+ return outTuple;
+ }
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+ //output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ shouldGetLeftTuple = true;
+ return outTuple;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+ joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+ if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ found = true;
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ matched.put(leftKeyTuple, true);
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+ return outTuple;
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ List<Tuple> newValue;
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ if (tupleSlots.containsKey(keyTuple)) {
+ newValue = tupleSlots.get(keyTuple);
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ matched.put(keyTuple,false);
+ }
+ }
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+ public void close() throws IOException {
+ tupleSlots.clear();
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+}
+