You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/02 11:18:02 UTC
git commit: TAJO-142: Implement hash anti-join operator. (hyunsik)
Updated Branches:
refs/heads/master 205621168 -> 0f3965a00
TAJO-142: Implement hash anti-join operator. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0f3965a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0f3965a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0f3965a0
Branch: refs/heads/master
Commit: 0f3965a0005fb6f041d487b5ce75f3ef6dc2d8e3
Parents: 2056211
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 2 18:13:23 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 2 18:14:34 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/algebra/JoinType.java | 8 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 2 +-
.../tajo/engine/planner/LogicalPlanner.java | 2 +-
.../engine/planner/PhysicalPlannerImpl.java | 14 +-
.../engine/planner/physical/BNLJoinExec.java | 16 +-
.../planner/physical/BinaryPhysicalExec.java | 28 +--
.../planner/physical/HashAntiJoinExec.java | 105 +++++++++++
.../engine/planner/physical/HashJoinExec.java | 94 ++++-----
.../engine/planner/physical/MergeJoinExec.java | 14 +-
.../engine/planner/physical/NLJoinExec.java | 6 +-
.../tajo/engine/planner/physical/UnionExec.java | 4 +-
.../planner/rewrite/FilterPushDownRule.java | 2 +-
.../planner/physical/TestBNLJoinExec.java | 22 +--
.../planner/physical/TestHashAntiJoinExec.java | 189 +++++++++++++++++++
.../planner/physical/TestHashJoinExec.java | 10 +-
.../planner/physical/TestMergeJoinExec.java | 8 +-
.../engine/planner/physical/TestNLJoinExec.java | 8 +-
18 files changed, 417 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c871c83..635205a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-142: Implement hash anti-join operator. (hyunsik)
+
TAJO-94: Remove duplicate proto files. (hyunsik)
TAJO-141: Set on demand as the default cluster mode. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
index 3e0a137..b506719 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
@@ -19,10 +19,14 @@
package org.apache.tajo.algebra;
public enum JoinType {
- CROSS_JOIN,
+ CROSS,
INNER,
LEFT_OUTER,
RIGHT_OUTER,
FULL_OUTER,
- UNION
+ UNION,
+ LEFT_ANTI,
+ RIGHT_ANTI,
+ LEFT_SEMI,
+ RIGHT_SEMI
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index aad275d..ed62171 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -299,7 +299,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
public Join visitJoined_table_primary(SQLParser.Joined_table_primaryContext ctx) {
Join join;
if (ctx.CROSS() != null) {
- join = new Join(JoinType.CROSS_JOIN);
+ join = new Join(JoinType.CROSS);
} else if (ctx.UNION() != null) {
join = new Join(JoinType.UNION);
} else { // qualified join or natural
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 2c4f241..f78b367 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -245,7 +245,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
private static LogicalNode createCatasianProduct(LogicalNode left, LogicalNode right) {
- JoinNode join = new JoinNode(JoinType.CROSS_JOIN, left, right);
+ JoinNode join = new JoinNode(JoinType.CROSS, left, right);
Schema joinSchema = SchemaUtil.merge(
join.getLeftChild().getOutSchema(),
join.getRightChild().getOutSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index a2e38b1..06ac2d6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -150,8 +150,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PhysicalExec outer, PhysicalExec inner)
throws IOException {
switch (joinNode.getJoinType()) {
- case CROSS_JOIN:
- LOG.info("The planner chooses NLJoinExec");
+ case CROSS:
+ LOG.info("The planner chooses [Nested Loop Join]");
return new NLJoinExec(ctx, joinNode, outer, inner);
case INNER:
@@ -180,7 +180,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedOuter = outer;
}
- LOG.info("The planner chooses HashJoinExec");
+ LOG.info("The planner chooses [InMemory Hash Join]");
return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
}
@@ -194,7 +194,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
inner);
- LOG.info("The planner chooses MergeJoinExec");
+ LOG.info("The planner chooses [Merge Join]");
return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
sortSpecs[0], sortSpecs[1]);
}
@@ -243,7 +243,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
- LOG.info("The planner chooses HashAggregationExec");
+ LOG.info("The planner chooses [Hash Aggregation]");
return new HashAggregateExec(ctx, groupbyNode, subOp);
} else {
String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
@@ -253,7 +253,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
// if the relation size is less than the reshold,
// the hash aggregation will be used.
if (estimatedSize <= threshold) {
- LOG.info("The planner chooses HashAggregationExec");
+ LOG.info("The planner chooses [Hash Aggregation]");
return new HashAggregateExec(ctx, groupbyNode, subOp);
} else {
SortSpec[] specs = new SortSpec[grpColumns.length];
@@ -266,7 +266,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
// SortExec sortExec = new SortExec(sortNode, child);
ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
subOp);
- LOG.info("The planner chooses SortAggregationExec");
+ LOG.info("The planner chooses [Sort Aggregation]");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 666d049..694602b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -84,7 +84,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
if (outerTupleSlots.isEmpty()) {
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
- Tuple t = outerChild.next();
+ Tuple t = leftChild.next();
if (t == null) {
outerEnd = true;
break;
@@ -97,7 +97,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
if (innerTupleSlots.isEmpty()) {
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
- Tuple t = innerChild.next();
+ Tuple t = rightChild.next();
if (t == null) {
innerEnd = true;
break;
@@ -107,7 +107,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
innerIterator = innerTupleSlots.iterator();
}
- if((innext = innerChild.next()) == null){
+ if((innext = rightChild.next()) == null){
innerEnd = true;
}
@@ -118,7 +118,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
innerIterator = innerTupleSlots.iterator();
} else {
if (innerEnd) {
- innerChild.rescan();
+ rightChild.rescan();
innerEnd = false;
if (outerEnd) {
@@ -126,7 +126,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
}
outerTupleSlots.clear();
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
- Tuple t = outerChild.next();
+ Tuple t = leftChild.next();
if (t == null) {
outerEnd = true;
break;
@@ -148,7 +148,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
if (innext != null) {
innerTupleSlots.add(innext);
for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill inner
- Tuple t = innerChild.next();
+ Tuple t = rightChild.next();
if (t == null) {
innerEnd = true;
break;
@@ -157,7 +157,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
}
} else {
for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill inner
- Tuple t = innerChild.next();
+ Tuple t = rightChild.next();
if (t == null) {
innerEnd = true;
break;
@@ -166,7 +166,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
}
}
- if ((innext = innerChild.next()) == null) {
+ if ((innext = rightChild.next()) == null) {
innerEnd = true;
}
innerIterator = innerTupleSlots.iterator();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index a17ddf4..e98d505 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -24,40 +24,40 @@ import org.apache.tajo.catalog.Schema;
import java.io.IOException;
public abstract class BinaryPhysicalExec extends PhysicalExec {
- protected final PhysicalExec outerChild;
- protected final PhysicalExec innerChild;
+ protected final PhysicalExec leftChild;
+ protected final PhysicalExec rightChild;
public BinaryPhysicalExec(final TaskAttemptContext context,
final Schema inSchema, final Schema outSchema,
final PhysicalExec outer, final PhysicalExec inner) {
super(context, inSchema, outSchema);
- this.outerChild = outer;
- this.innerChild = inner;
+ this.leftChild = outer;
+ this.rightChild = inner;
}
- public PhysicalExec getOuterChild() {
- return outerChild;
+ public PhysicalExec getLeftChild() {
+ return leftChild;
}
- public PhysicalExec getInnerChild() {
- return innerChild;
+ public PhysicalExec getRightChild() {
+ return rightChild;
}
@Override
public void init() throws IOException {
- outerChild.init();
- innerChild.init();
+ leftChild.init();
+ rightChild.init();
}
@Override
public void rescan() throws IOException {
- outerChild.rescan();
- innerChild.rescan();
+ leftChild.rescan();
+ rightChild.rescan();
}
@Override
public void close() throws IOException {
- outerChild.close();
- innerChild.close();
+ leftChild.close();
+ rightChild.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
new file mode 100644
index 0000000..8f2d115
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class HashAntiJoinExec extends HashJoinExec {
+ private Tuple rightNullTuple;
+
+ public HashAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
+ PhysicalExec right) {
+ super(context, plan, left, right);
+ // NUll Tuple
+ rightNullTuple = new VTuple(leftChild.outColumnNum);
+ for (int i = 0; i < leftChild.outColumnNum; i++) {
+ rightNullTuple.put(i, NullDatum.get());
+ }
+ }
+
+ /**
+ * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+ * next() method finds the first unmatched tuple from both tables.
+ *
+ * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
+ * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
+ * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
+ *
+ * @return The tuple which is unmatched to a given join condition.
+ * @throws IOException
+ */
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean notFound;
+
+ while(!finished) {
+
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // Try to find a hash bucket in in-memory hash table
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ if (tupleSlots.containsKey(leftKeyTuple)) {
+ // if found, it gets a hash bucket from the hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ } else {
+ // if not found, it returns a tuple.
+ frameTuple.set(leftTuple, rightNullTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ return outTuple;
+ }
+
+ // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+ // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
+ notFound = true;
+ while (notFound && iterator.hasNext()) {
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ notFound = false;
+ }
+ }
+
+ if (notFound) { // if there is no matched tuple
+ frameTuple.set(leftTuple, rightNullTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index cce5587..355b357 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -35,30 +35,30 @@ import java.util.*;
public class HashJoinExec extends BinaryPhysicalExec {
// from logical plan
- private JoinNode plan;
- private EvalNode joinQual;
+ protected JoinNode plan;
+ protected EvalNode joinQual;
- private List<Column[]> joinKeyPairs;
+ protected List<Column[]> joinKeyPairs;
// temporal tuples and states for nested loop join
- private boolean first = true;
- private FrameTuple frameTuple;
- private Tuple outTuple = null;
- private Map<Tuple, List<Tuple>> tupleSlots;
- private Iterator<Tuple> iterator = null;
- private EvalContext qualCtx;
- private Tuple outerTuple;
- private Tuple outerKeyTuple;
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected EvalContext qualCtx;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
- private int [] outerKeyList;
- private int [] innerKeyList;
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
- private boolean finished = false;
- boolean nextOuter = true;
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
// projection
- private final Projector projector;
- private final EvalContext [] evalContexts;
+ protected final Projector projector;
+ protected final EvalContext [] evalContexts;
public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner) {
@@ -72,15 +72,15 @@ public class HashJoinExec extends BinaryPhysicalExec {
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
outer.getSchema(), inner.getSchema());
- outerKeyList = new int[joinKeyPairs.size()];
- innerKeyList = new int[joinKeyPairs.size()];
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
for (int i = 0; i < joinKeyPairs.size(); i++) {
- outerKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
}
for (int i = 0; i < joinKeyPairs.size(); i++) {
- innerKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
}
// for projection
@@ -90,56 +90,56 @@ public class HashJoinExec extends BinaryPhysicalExec {
// for join
frameTuple = new FrameTuple();
outTuple = new VTuple(outSchema.getColumnNum());
- outerKeyTuple = new VTuple(outerKeyList.length);
+ leftKeyTuple = new VTuple(leftKeyList.length);
}
- private void getKeyOuterTuple(final Tuple outerTuple, Tuple keyTuple) {
- for (int i = 0; i < outerKeyList.length; i++) {
- keyTuple.put(i, outerTuple.get(outerKeyList[i]));
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
}
}
public Tuple next() throws IOException {
if (first) {
- loadInnerTable();
+ loadRightToHashTable();
}
- Tuple innerTuple;
+ Tuple rightTuple;
boolean found = false;
while(!finished) {
- if (nextOuter) {
+ if (shouldGetLeftTuple) { // initially, it is true.
// getting new outer
- outerTuple = outerChild.next();
- if (outerTuple == null) {
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
finished = true;
return null;
}
- // getting corresponding inner
- getKeyOuterTuple(outerTuple, outerKeyTuple);
- if (tupleSlots.containsKey(outerKeyTuple)) {
- iterator = tupleSlots.get(outerKeyTuple).iterator();
- nextOuter = false;
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ shouldGetLeftTuple = false;
} else {
- nextOuter = true;
+ shouldGetLeftTuple = true;
continue;
}
}
- // getting next inner tuple
- innerTuple = iterator.next();
- frameTuple.set(outerTuple, innerTuple);
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
joinQual.eval(qualCtx, inSchema, frameTuple);
- if (joinQual.terminate(qualCtx).asBool()) {
+ if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
projector.eval(evalContexts, frameTuple);
projector.terminate(evalContexts, outTuple);
found = true;
}
- if (!iterator.hasNext()) { // no more inner tuple
- nextOuter = true;
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
}
if (found) {
@@ -150,15 +150,15 @@ public class HashJoinExec extends BinaryPhysicalExec {
return outTuple;
}
- private void loadInnerTable() throws IOException {
+ protected void loadRightToHashTable() throws IOException {
Tuple tuple;
Tuple keyTuple;
- while ((tuple = innerChild.next()) != null) {
+ while ((tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
List<Tuple> newValue;
- for (int i = 0; i < innerKeyList.length; i++) {
- keyTuple.put(i, tuple.get(innerKeyList[i]));
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
}
if (tupleSlots.containsKey(keyTuple)) {
@@ -183,7 +183,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
finished = false;
iterator = null;
- nextOuter = true;
+ shouldGetLeftTuple = true;
}
public void close() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 0179da5..1820c32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -96,7 +96,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
outTuple = new VTuple(outSchema.getColumnNum());
}
- public JoinNode getJoinNode(){
+ public JoinNode getPlan(){
return this.joinNode;
}
@@ -110,10 +110,10 @@ public class MergeJoinExec extends BinaryPhysicalExec {
}
if(outerTuple == null){
- outerTuple = outerChild.next();
+ outerTuple = leftChild.next();
}
if(innerTuple == null){
- innerTuple = innerChild.next();
+ innerTuple = rightChild.next();
}
outerTupleSlots.clear();
@@ -122,9 +122,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
int cmp;
while ((cmp = joincomparator.compare(outerTuple, innerTuple)) != 0) {
if (cmp > 0) {
- innerTuple = innerChild.next();
+ innerTuple = rightChild.next();
} else if (cmp < 0) {
- outerTuple = outerChild.next();
+ outerTuple = leftChild.next();
}
if (innerTuple == null || outerTuple == null) {
return null;
@@ -134,7 +134,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
previous = outerTuple;
do {
outerTupleSlots.add(outerTuple);
- outerTuple = outerChild.next();
+ outerTuple = leftChild.next();
if (outerTuple == null) {
end = true;
break;
@@ -146,7 +146,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
previous = innerTuple;
do {
innerTupleSlots.add(innerTuple);
- innerTuple = innerChild.next();
+ innerTuple = rightChild.next();
if (innerTuple == null) {
end = true;
break;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 3ce25ea..15d8ff9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -74,17 +74,17 @@ public class NLJoinExec extends BinaryPhysicalExec {
public Tuple next() throws IOException {
for (;;) {
if (needNewOuter) {
- outerTuple = outerChild.next();
+ outerTuple = leftChild.next();
if (outerTuple == null) {
return null;
}
needNewOuter = false;
}
- innerTuple = innerChild.next();
+ innerTuple = rightChild.next();
if (innerTuple == null) {
needNewOuter = true;
- innerChild.rescan();
+ rightChild.rescan();
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
index 7a757ca..6af08a5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
@@ -42,7 +42,7 @@ public class UnionExec extends BinaryPhysicalExec {
@Override
public Tuple next() throws IOException {
if (nextOuter) {
- tuple = outerChild.next();
+ tuple = leftChild.next();
if (tuple == null) {
nextOuter = false;
} else {
@@ -50,7 +50,7 @@ public class UnionExec extends BinaryPhysicalExec {
}
}
- return innerChild.next();
+ return rightChild.next();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 1ae4c1c..75d13ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -115,7 +115,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
} else {
joinNode.setJoinQual(qual);
}
- if (joinNode.getJoinType() == JoinType.CROSS_JOIN) {
+ if (joinNode.getJoinType() == JoinType.CROSS) {
joinNode.setJoinType(JoinType.INNER);
}
cnf.removeAll(matched);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index c5b1c05..bea1eb6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -129,7 +129,7 @@ public class TestBNLJoinExec {
"inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
@Test
- public final void testCrossJoin() throws IOException {
+ public final void testBNLCrossJoin() throws IOException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestBNLJoinExec {
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
TUtil.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[0]);
@@ -148,8 +148,8 @@ public class TestBNLJoinExec {
ProjectionExec proj = (ProjectionExec) exec;
NLJoinExec nlJoin = (NLJoinExec) proj.getChild();
- SeqScanExec scanOuter = (SeqScanExec) nlJoin.getOuterChild();
- SeqScanExec scanInner = (SeqScanExec) nlJoin.getInnerChild();
+ SeqScanExec scanOuter = (SeqScanExec) nlJoin.getLeftChild();
+ SeqScanExec scanInner = (SeqScanExec) nlJoin.getRightChild();
BNLJoinExec bnl = new BNLJoinExec(ctx, nlJoin.getPlan(), scanOuter, scanInner);
proj.setChild(bnl);
@@ -164,7 +164,7 @@ public class TestBNLJoinExec {
}
@Test
- public final void testInnerJoin() throws IOException {
+ public final void testBNLInnerJoin() throws IOException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -172,7 +172,7 @@ public class TestBNLJoinExec {
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
TaskAttemptContext ctx =
new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
merged, workDir);
@@ -189,15 +189,15 @@ public class TestBNLJoinExec {
JoinNode joinNode = null;
if (proj.getChild() instanceof MergeJoinExec) {
MergeJoinExec join = (MergeJoinExec) proj.getChild();
- ExternalSortExec sortOut = (ExternalSortExec) join.getOuterChild();
- ExternalSortExec sortIn = (ExternalSortExec) join.getInnerChild();
+ ExternalSortExec sortOut = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortIn = (ExternalSortExec) join.getRightChild();
scanOuter = (SeqScanExec) sortOut.getChild();
scanInner = (SeqScanExec) sortIn.getChild();
- joinNode = join.getJoinNode();
+ joinNode = join.getPlan();
} else if (proj.getChild() instanceof HashJoinExec) {
HashJoinExec join = (HashJoinExec) proj.getChild();
- scanOuter = (SeqScanExec) join.getOuterChild();
- scanInner = (SeqScanExec) join.getInnerChild();
+ scanOuter = (SeqScanExec) join.getLeftChild();
+ scanInner = (SeqScanExec) join.getRightChild();
joinNode = join.getPlan();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
new file mode 100644
index 0000000..a429cf2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashAntiJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private StorageManager sm;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManager.get(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerId", Type.INT4);
+ employeeSchema.addColumn("empId", Type.INT4);
+ employeeSchema.addColumn("memId", Type.INT4);
+ employeeSchema.addColumn("deptName", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+ StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), // empid [0-9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+ catalog.addTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empId", Type.INT4);
+ peopleSchema.addColumn("fk_memId", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+ for (int i = 1; i < 10; i += 2) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+ catalog.addTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ optimizer = new LogicalOptimizer();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+
+ // relation descriptions
+ // employee (managerid, empid, memid, deptname)
+ // people (empid, fk_memid, name, age)
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+ };
+
+ @Test
+ public final void testHashAntiJoin() throws IOException, PlanningException {
+ Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ TUtil.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(expr);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ // replace an equal join with an hash anti join.
+ if (exec instanceof MergeJoinExec) {
+ MergeJoinExec join = (MergeJoinExec) exec;
+ ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+ SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+ SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+ } else if (exec instanceof HashJoinExec) {
+ HashJoinExec join = (HashJoinExec) exec;
+
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ }
+
+ Tuple tuple;
+ int count = 0;
+ int i = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.getInt(0).asInt4());
+ assertTrue(i == tuple.getInt(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
+ assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+ assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 591f431..9422358 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -129,7 +129,7 @@ public class TestHashJoinExec {
};
@Test
- public final void testInnerJoin() throws IOException {
+ public final void testHashInnerJoin() throws IOException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestHashJoinExec {
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
TUtil.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[0]);
@@ -149,12 +149,12 @@ public class TestHashJoinExec {
ProjectionExec proj = (ProjectionExec) exec;
if (proj.getChild() instanceof MergeJoinExec) {
MergeJoinExec join = (MergeJoinExec) proj.getChild();
- ExternalSortExec sortout = (ExternalSortExec) join.getOuterChild();
- ExternalSortExec sortin = (ExternalSortExec) join.getInnerChild();
+ ExternalSortExec sortout = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortin = (ExternalSortExec) join.getRightChild();
SeqScanExec scanout = (SeqScanExec) sortout.getChild();
SeqScanExec scanin = (SeqScanExec) sortin.getChild();
- HashJoinExec hashjoin = new HashJoinExec(ctx, join.getJoinNode(), scanout, scanin);
+ HashJoinExec hashjoin = new HashJoinExec(ctx, join.getPlan(), scanout, scanin);
proj.setChild(hashjoin);
exec = proj;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index b85d1fa..9d85970 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -145,7 +145,7 @@ public class TestMergeJoinExec {
};
@Test
- public final void testInnerJoin() throws IOException {
+ public final void testMergeInnerJoin() throws IOException {
Fragment[] empFrags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = sm.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -153,7 +153,7 @@ public class TestMergeJoinExec {
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
TUtil.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[0]);
@@ -167,8 +167,8 @@ public class TestMergeJoinExec {
// TODO - should be planed with user's optimization hint
if (!(proj.getChild() instanceof MergeJoinExec)) {
BinaryPhysicalExec nestedLoopJoin = (BinaryPhysicalExec) proj.getChild();
- SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getOuterChild();
- SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getInnerChild();
+ SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getLeftChild();
+ SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getRightChild();
SeqScanExec tmp;
if (!outerScan.getTableName().equals("employee")) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index c66b0ca..7235924 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -129,7 +129,7 @@ public class TestNLJoinExec {
};
@Test
- public final void testCrossJoin() throws IOException {
+ public final void testNLCrossJoin() throws IOException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestNLJoinExec {
Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
TUtil.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[0]);
@@ -156,7 +156,7 @@ public class TestNLJoinExec {
}
@Test
- public final void testInnerJoin() throws IOException {
+ public final void testNLInnerJoin() throws IOException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -164,7 +164,7 @@ public class TestNLJoinExec {
Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
TUtil.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[1]);