You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/05/15 05:59:28 UTC
[2/2] tajo git commit: TAJO-1542 Refactoring of HashJoinExecs.
(contributed by navis, committed by hyunsik)
TAJO-1542 Refactoring of HashJoinExecs. (contributed by navis, committed by hyunsik)
Closes #529 #567
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36a703c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36a703c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36a703c5
Branch: refs/heads/master
Commit: 36a703c5dc2c2257dfd52232f204507fb4b79024
Parents: f3acbdf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 14 20:59:09 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 14 20:59:09 2015 -0700
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/catalog/Schema.java | 16 +
.../org/apache/tajo/storage/EmptyTuple.java | 140 +-----
.../java/org/apache/tajo/storage/NullTuple.java | 175 +++++++
.../java/org/apache/tajo/storage/VTuple.java | 20 +-
.../engine/planner/PhysicalPlannerImpl.java | 22 +-
.../physical/BasicPhysicalExecutorVisitor.java | 8 -
.../planner/physical/CommonHashJoinExec.java | 191 ++++++++
.../engine/planner/physical/CommonJoinExec.java | 172 ++++++-
.../planner/physical/HashFullOuterJoinExec.java | 247 ++++------
.../engine/planner/physical/HashJoinExec.java | 212 +--------
.../planner/physical/HashLeftAntiJoinExec.java | 59 +--
.../planner/physical/HashLeftOuterJoinExec.java | 292 +-----------
.../planner/physical/HashLeftSemiJoinExec.java | 48 +-
.../planner/physical/NLLeftOuterJoinExec.java | 101 ----
.../physical/PhysicalExecutorVisitor.java | 3 -
.../physical/RightOuterMergeJoinExec.java | 40 +-
.../apache/tajo/engine/utils/CacheHolder.java | 3 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../physical/TestLeftOuterHashJoinExec.java | 104 ++--
.../physical/TestLeftOuterNLJoinExec.java | 474 -------------------
.../testJoinFilterOfRowPreservedTable1.sql | 2 +-
.../testJoinFilterOfRowPreservedTable1.result | 2 +-
.../plan/expr/AggregationFunctionCallEval.java | 4 +-
.../apache/tajo/plan/expr/AlgebraicUtil.java | 5 +
.../org/apache/tajo/plan/expr/EvalNode.java | 39 +-
.../java/org/apache/tajo/plan/expr/InEval.java | 2 +-
.../plan/expr/PatternMatchPredicateEval.java | 2 +-
.../tajo/plan/expr/WindowFunctionEval.java | 2 +-
.../org/apache/tajo/storage/FrameTuple.java | 14 +-
30 files changed, 842 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 77be589..44ae4b4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1542: Refactoring of HashJoinExecs. (Contributed Navis, Committed by
+ hyunsik)
+
TAJO-1591: Change StoreType represented as Enum to String type. (hyunsik)
TAJO-1452: Improve function listing order (Contributed Dongjoon Hyun,
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 0e4b741..80c4d83 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -400,6 +400,22 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
return containFlag;
}
+ /**
+ * Return TRUE if any column in <code>columns</code> is included in this schema.
+ *
+ * @param columns Columns to be checked
+ * @return true if any column in <code>columns</code> is included in this schema.
+ * Otherwise, false.
+ */
+ public boolean containsAny(Collection<Column> columns) {
+ for (Column column : columns) {
+ if (contains(column)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public synchronized Schema addColumn(String name, TypeDesc typeDesc) {
String normalized = name;
if(fieldsByQualifiedName.containsKey(normalized)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
index 89e72ed..cdcebd7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
@@ -18,17 +18,12 @@
package org.apache.tajo.storage;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
/* This class doesn’t have content datum. if selected column is zero, this is useful
* e.g. select count(*) from table
* */
-public class EmptyTuple implements Tuple, Cloneable {
+public class EmptyTuple extends NullTuple {
private static EmptyTuple tuple;
- private static Datum[] EMPTY_VALUES = new Datum[0];
static {
tuple = new EmptyTuple();
@@ -39,138 +34,11 @@ public class EmptyTuple implements Tuple, Cloneable {
}
private EmptyTuple() {
+ super(0);
}
@Override
- public int size() {
- return 0;
- }
-
- public boolean contains(int fieldId) {
- return false;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return true;
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return false;
- }
-
- @Override
- public void clear() {
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void put(Datum[] values) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Datum get(int fieldId) {
- return NullDatum.get();
- }
-
- @Override
- public void setOffset(long offset) {
-
- }
-
- @Override
- public long getOffset() {
- return -1;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return NullDatum.get().asBool();
- }
-
- @Override
- public byte getByte(int fieldId) {
- return NullDatum.get().asByte();
- }
-
- @Override
- public char getChar(int fieldId) {
- return NullDatum.get().asChar();
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- return NullDatum.get().asByteArray();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return NullDatum.get().asInt2();
- }
-
- @Override
- public int getInt4(int fieldId) {
- return NullDatum.get().asInt4();
- }
-
- @Override
- public long getInt8(int fieldId) {
- return NullDatum.get().asInt8();
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return NullDatum.get().asFloat4();
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return NullDatum.get().asFloat8();
- }
-
- @Override
- public String getText(int fieldId) {
- return NullDatum.get().asChars();
- }
-
- @Override
- public ProtobufDatum getProtobufDatum(int fieldId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Datum getInterval(int fieldId) {
- return NullDatum.get();
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- return NullDatum.get().asUnicodeChars();
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- throw new CloneNotSupportedException();
- }
-
- @Override
- public Datum[] getValues() {
- return EMPTY_VALUES;
+ public Tuple clone() {
+ return this;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
new file mode 100644
index 0000000..45eb859
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+import java.util.Arrays;
+
+/**
+ * A tuple which contains all null datums. It is used for outer joins.
+ */
+public class NullTuple implements Tuple, Cloneable {
+
+ public static Tuple create(int size) {
+ return new NullTuple(size);
+ }
+
+ private final int size;
+
+ NullTuple(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public boolean contains(int fieldId) {
+ return fieldId < size;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return true;
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ return NullDatum.get();
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return NullDatum.get().asBool();
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return NullDatum.get().asByte();
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return NullDatum.get().asChar();
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ return NullDatum.get().asByteArray();
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ return NullDatum.get().asInt2();
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return NullDatum.get().asInt4();
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return NullDatum.get().asInt8();
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return NullDatum.get().asFloat4();
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return NullDatum.get().asFloat8();
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return NullDatum.get().asChars();
+ }
+
+ @Override
+ public ProtobufDatum getProtobufDatum(int fieldId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Datum getInterval(int fieldId) {
+ return NullDatum.get();
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ return NullDatum.get().asUnicodeChars();
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return new NullTuple(size);
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum[] datum = new Datum[size];
+ Arrays.fill(datum, NullDatum.get());
+ return datum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 5e839b7..da69eb0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -201,6 +201,7 @@ public class VTuple implements Tuple, Cloneable {
return tuple;
}
+ @Override
public String toString() {
return toDisplayString(getValues());
}
@@ -225,22 +226,15 @@ public class VTuple implements Tuple, Cloneable {
}
public static String toDisplayString(Datum [] values) {
- boolean first = true;
StringBuilder str = new StringBuilder();
- str.append("(");
- for(int i=0; i < values.length; i++) {
- if(values[i] != null) {
- if(first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(values[i]);
+ str.append('(');
+ for (Datum datum : values) {
+ if (str.length() > 1) {
+ str.append(',');
}
+ str.append(datum);
}
- str.append(")");
+ str.append(')');
return str.toString();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 506b03e..978dde8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -466,14 +466,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
- case NESTED_LOOP_JOIN:
- //the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ case MERGE_JOIN:
+ //the right operand is too large, so we opt for merge join implementation
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+ return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
- LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
+ return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
}
} else {
return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
@@ -500,9 +500,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
else {
- //the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ //the right operand is too large, so we opt for merge join implementation
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+ return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
}
}
@@ -566,7 +566,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
default:
LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
- LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
} else {
@@ -589,7 +589,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
default:
LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
- LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index 42611b0..c2d93bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -65,8 +65,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
return visitMergeJoin(context, (MergeJoinExec) exec, stack);
} else if (exec instanceof NLJoinExec) {
return visitNLJoin(context, (NLJoinExec) exec, stack);
- } else if (exec instanceof NLLeftOuterJoinExec) {
- return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
} else if (exec instanceof ProjectionExec) {
return visitProjection(context, (ProjectionExec) exec, stack);
} else if (exec instanceof RangeShuffleFileWriteExec) {
@@ -214,12 +212,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
}
@Override
- public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
- throws PhysicalPlanningException {
- return visitBinaryExecutor(context, exec, stack);
- }
-
- @Override
public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException {
return visitUnaryExecutor(context, exec, stack);
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
new file mode 100644
index 0000000..ff9b253
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * common exec for all hash join execs
+ *
+ * @param <T> Tuple collection type to load small relation onto in-memory
+ */
+public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
+
+ protected final List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected Map<Tuple, T> tupleSlots;
+
+ protected Iterator<Tuple> iterator;
+
+ protected final Tuple keyTuple;
+
+ protected final int rightNumCols;
+ protected final int leftNumCols;
+
+ protected final int[] leftKeyList;
+ protected final int[] rightKeyList;
+
+ protected boolean finished;
+
+ public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
+ super(context, plan, outer, inner);
+
+ // HashJoin only can manage equi join key pairs.
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
+ inner.getSchema(), false);
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ leftNumCols = outer.getSchema().size();
+ rightNumCols = inner.getSchema().size();
+
+ keyTuple = new VTuple(leftKeyList.length);
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+ if (scanExec.canBroadcast()) {
+ /* If this table can broadcast, all tasks in a node will share the same cache */
+ TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+ context, scanExec.getCanonicalName(), scanExec.getFragments());
+ loadRightFromCache(key);
+ } else {
+ this.tupleSlots = convert(buildRightToHashTable(), false);
+ }
+
+ first = false;
+ }
+
+ protected void loadRightFromCache(TableCacheKey key) throws IOException {
+ ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+
+ CacheHolder<Map<Tuple, List<Tuple>>> holder;
+ synchronized (sharedResource.getLock()) {
+ if (sharedResource.hasBroadcastCache(key)) {
+ holder = sharedResource.getBroadcastCache(key);
+ } else {
+ Map<Tuple, List<Tuple>> built = buildRightToHashTable();
+ holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null);
+ sharedResource.addBroadcastCache(key, holder);
+ }
+ }
+ this.tupleSlots = convert(holder.getData(), true);
+ }
+
+ protected Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
+ Tuple tuple;
+ Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
+
+ while (!context.isStopped() && (tuple = rightChild.next()) != null) {
+ Tuple keyTuple = new VTuple(joinKeyPairs.size());
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ List<Tuple> newValue = map.get(keyTuple);
+ if (newValue == null) {
+ map.put(keyTuple, newValue = new ArrayList<Tuple>());
+ }
+ // if source is scan or groupby, it needs not to be cloned
+ newValue.add(new VTuple(tuple));
+ }
+ return map;
+ }
+
+ // todo: convert loaded data to cache condition
+ protected abstract Map<Tuple, T> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+ throws IOException;
+
+ protected Tuple toKey(final Tuple outerTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ return keyTuple;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ finished = false;
+ iterator = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ iterator = null;
+ if (tupleSlots != null) {
+ tupleSlots.clear();
+ tupleSlots = null;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftChild == null) {
+ return inputStats;
+ }
+ TableStats leftInputStats = leftChild.getInputStats();
+ inputStats.setNumBytes(0);
+ inputStats.setReadBytes(0);
+ inputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ inputStats.setNumBytes(leftInputStats.getNumBytes());
+ inputStats.setReadBytes(leftInputStats.getReadBytes());
+ inputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightChild.getInputStats();
+ if (rightInputStats != null) {
+ inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+ inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+ inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return inputStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
index 2535edf..ec29085 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
@@ -18,36 +18,178 @@
package org.apache.tajo.engine.planner.physical;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.plan.expr.AlgebraicUtil;
+import org.apache.tajo.plan.expr.BinaryEval;
import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
-// common join exec except HashLeftOuterJoinExec
+/**
+ * common exec for all join execs
+ */
public abstract class CommonJoinExec extends BinaryPhysicalExec {
// from logical plan
protected JoinNode plan;
protected final boolean hasJoinQual;
- protected EvalNode joinQual;
+ protected EvalNode joinQual; // ex) a.id = b.id
+ protected EvalNode leftJoinFilter; // ex) a > 10
+ protected EvalNode rightJoinFilter; // ex) b > 5
+
+ protected final Schema leftSchema;
+ protected final Schema rightSchema;
+
+ protected final FrameTuple frameTuple;
+ protected final Tuple outTuple;
// projection
protected Projector projector;
- public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
- PhysicalExec inner) {
+ public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
plan.getOutSchema(), outer, inner);
this.plan = plan;
- this.joinQual = plan.getJoinQual();
- this.hasJoinQual = plan.hasJoinQual();
+ this.leftSchema = outer.getSchema();
+ this.rightSchema = inner.getSchema();
+ if (plan.hasJoinQual()) {
+ EvalNode[] extracted = extractJoinConditions(plan.getJoinQual(), leftSchema, rightSchema);
+ joinQual = extracted[0];
+ leftJoinFilter = extracted[1];
+ rightJoinFilter = extracted[2];
+ }
+ this.hasJoinQual = joinQual != null;
// for projection
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+
+ // for join
+ this.frameTuple = new FrameTuple();
+ this.outTuple = new VTuple(outSchema.size());
+ }
+
+ /**
+ * It separates a singular CNF-formed join condition into a join condition, a left join filter, and
+ * right join filter.
+ *
+ * @param joinQual the original join condition
+ * @param leftSchema Left table schema
+ * @param rightSchema Left table schema
+ * @return Three element EvalNodes, 0 - join condition, 1 - left join filter, 2 - right join filter.
+ */
+ private EvalNode[] extractJoinConditions(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+ List<EvalNode> joinQuals = Lists.newArrayList();
+ List<EvalNode> leftFilters = Lists.newArrayList();
+ List<EvalNode> rightFilters = Lists.newArrayList();
+ for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) {
+ if (!(eachQual instanceof BinaryEval)) {
+ continue; // todo 'between', etc.
+ }
+ BinaryEval binaryEval = (BinaryEval)eachQual;
+ LinkedHashSet<Column> leftColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getLeftExpr());
+ LinkedHashSet<Column> rightColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getRightExpr());
+ boolean leftInLeft = leftSchema.containsAny(leftColumns);
+ boolean rightInLeft = leftSchema.containsAny(rightColumns);
+ boolean leftInRight = rightSchema.containsAny(leftColumns);
+ boolean rightInRight = rightSchema.containsAny(rightColumns);
+
+ boolean columnsFromLeft = leftInLeft || rightInLeft;
+ boolean columnsFromRight = leftInRight || rightInRight;
+ if (!columnsFromLeft && !columnsFromRight) {
+ continue; // todo constant expression : this should be done in logical phase
+ }
+ if (columnsFromLeft ^ columnsFromRight) {
+ if (columnsFromLeft) {
+ leftFilters.add(eachQual);
+ } else {
+ rightFilters.add(eachQual);
+ }
+ continue;
+ }
+ if ((leftInLeft && rightInLeft) || (leftInRight && rightInRight)) {
+ continue; // todo not allowed yet : this should be checked in logical phase
+ }
+ joinQuals.add(eachQual);
+ }
+ return new EvalNode[] {
+ joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals),
+ leftFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(leftFilters),
+ rightFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(rightFilters)
+ };
+ }
+
+ public JoinNode getPlan() {
+ return plan;
+ }
+
+ /**
+ * Evaluate an input tuple with a left join filter
+ *
+ * @param left Tuple to be evaluated
+ * @return True if an input tuple is matched to the left join filter
+ */
+ protected boolean leftFiltered(Tuple left) {
+ return leftJoinFilter != null && !leftJoinFilter.eval(left).asBool();
+ }
+
+ /**
+ * Evaluate an input tuple with a right join filter
+ *
+ * @param right Tuple to be evaluated
+ * @return True if an input tuple is matched to the right join filter
+ */
+ protected boolean rightFiltered(Tuple right) {
+ return rightJoinFilter != null && !rightJoinFilter.eval(right).asBool();
+ }
+
+ /**
+ * Return an tuple iterator filters rows in a right table by using a join filter.
+ * It must takes rows of a right table.
+ *
+ * @param rightTuples Tuple iterator
+ * @return rows Filtered by a join filter on right table.
+ */
+ protected Iterator<Tuple> rightFiltered(Iterable<Tuple> rightTuples) {
+ if (rightTuples == null) {
+ return Iterators.emptyIterator();
+ }
+ if (rightJoinFilter == null) {
+ return rightTuples.iterator();
+ }
+ return Iterators.filter(rightTuples.iterator(), new Predicate<Tuple>() {
+ @Override
+ public boolean apply(Tuple input) {
+ return rightJoinFilter.eval(input).asBool();
+ }
+ });
+ }
+
+ /**
+ * Return an tuple iterator, containing a single NullTuple
+ *
+ * @param width the width of tuple
+ * @return an tuple iterator, containing a single NullTuple
+ */
+ protected Iterator<Tuple> nullIterator(int width) {
+ return Arrays.asList(NullTuple.create(width)).iterator();
}
@Override
@@ -56,6 +198,12 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
if (hasJoinQual) {
joinQual.bind(context.getEvalContext(), inSchema);
}
+ if (leftJoinFilter != null) {
+ leftJoinFilter.bind(context.getEvalContext(), leftSchema);
+ }
+ if (rightJoinFilter != null) {
+ rightJoinFilter.bind(context.getEvalContext(), rightSchema);
+ }
}
@Override
@@ -63,10 +211,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
if (hasJoinQual) {
joinQual = context.getPrecompiledEval(inSchema, joinQual);
}
- }
-
- public JoinNode getPlan() {
- return plan;
+ // compile filters?
}
@Override
@@ -74,6 +219,13 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
super.close();
plan = null;
joinQual = null;
+ leftJoinFilter = null;
+ rightJoinFilter = null;
projector = null;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " [" + leftSchema + " : " + rightSchema + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 6e28ae0..1645263 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,101 +18,59 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
+public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List<Tuple>>> {
-public class HashFullOuterJoinExec extends CommonJoinExec {
-
- protected List<Column[]> joinKeyPairs;
-
- // temporal tuples and states for nested loop join
- protected boolean first = true;
- protected FrameTuple frameTuple;
- protected Tuple outTuple = null;
- protected Map<Tuple, List<Tuple>> tupleSlots;
- protected Iterator<Tuple> iterator = null;
- protected Tuple leftTuple;
- protected Tuple leftKeyTuple;
-
- protected int [] leftKeyList;
- protected int [] rightKeyList;
-
- protected boolean finished = false;
- protected boolean shouldGetLeftTuple = true;
-
- private int rightNumCols;
- private int leftNumCols;
- private Map<Tuple, Boolean> matched;
+ private boolean finalLoop; // final loop for right unmatched
public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner) {
super(context, plan, outer, inner);
- this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
- // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
- // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
- this.matched = new HashMap<Tuple, Boolean>(10000);
-
- // HashJoin only can manage equi join key pairs.
- this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(),
- false);
-
- leftKeyList = new int[joinKeyPairs.size()];
- rightKeyList = new int[joinKeyPairs.size()];
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
- }
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
- }
-
- // for join
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.size());
- leftKeyTuple = new VTuple(leftKeyList.length);
-
- leftNumCols = outer.getSchema().size();
- rightNumCols = inner.getSchema().size();
}
- protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
- for (int i = 0; i < leftKeyList.length; i++) {
- keyTuple.put(i, outerTuple.get(leftKeyList[i]));
- }
- }
+ public Iterator<Tuple> getUnmatchedRight() {
- public Tuple getNextUnmatchedRight() {
+ return new Iterator<Tuple>() {
- List<Tuple> newValue;
- Tuple returnedTuple;
- // get a keyTUple from the matched hashmap with a boolean false value
- for(Tuple aKeyTuple : matched.keySet()) {
- if(matched.get(aKeyTuple) == false) {
- newValue = tupleSlots.get(aKeyTuple);
- returnedTuple = newValue.remove(0);
- tupleSlots.put(aKeyTuple, newValue);
+ private Iterator<Pair<Boolean, List<Tuple>>> iterator1 = tupleSlots.values().iterator();
+ private Iterator<Tuple> iterator2;
- // after taking the last element from the list in tupleSlots, set flag true in matched as well
- if(newValue.isEmpty()){
- matched.put(aKeyTuple, true);
+ @Override
+ public boolean hasNext() {
+ if (hasMore()) {
+ return true;
}
+ for (iterator2 = null; !hasMore() && iterator1.hasNext();) {
+ Pair<Boolean, List<Tuple>> next = iterator1.next();
+ if (!next.getFirst()) {
+ iterator2 = next.getSecond().iterator();
+ }
+ }
+ return hasMore();
+ }
- return returnedTuple;
+ private boolean hasMore() {
+ return iterator2 != null && iterator2.hasNext();
}
- }
- return null;
+
+ @Override
+ public Tuple next() {
+ return iterator2.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ };
}
public Tuple next() throws IOException {
@@ -120,112 +78,67 @@ public class HashFullOuterJoinExec extends CommonJoinExec {
loadRightToHashTable();
}
- Tuple rightTuple;
- boolean found = false;
-
- while(!context.isStopped() && !finished) {
- if (shouldGetLeftTuple) { // initially, it is true.
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
- Tuple unmatchedRightTuple = getNextUnmatchedRight();
- if( unmatchedRightTuple == null) {
- finished = true;
- outTuple = null;
- return null;
- } else {
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
- frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
- projector.eval(frameTuple, outTuple);
-
- return outTuple;
- }
- }
-
- // getting corresponding right
- getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
- List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
- if (rightTuples != null) { // found right tuples on in-memory hash table.
- iterator = rightTuples.iterator();
- shouldGetLeftTuple = false;
- } else {
- //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
- //output a tuple with the nulls padded rightTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(frameTuple, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- shouldGetLeftTuple = true;
- return outTuple;
- }
- }
-
- // getting a next right tuple on in-memory hash table.
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-
- if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable
+ while (!context.isStopped() && !finished) {
+ if (iterator != null && iterator.hasNext()) {
+ frameTuple.setRight(iterator.next());
projector.eval(frameTuple, outTuple);
- found = true;
- getKeyLeftTuple(leftTuple, leftKeyTuple);
- matched.put(leftKeyTuple, true);
+ return outTuple;
}
-
- if (!iterator.hasNext()) { // no more right tuples for this hash key
- shouldGetLeftTuple = true;
+ if (finalLoop) {
+ finished = true;
+ return null;
}
-
- if (found) {
- break;
+ Tuple leftTuple = leftChild.next();
+ if (leftTuple == null) {
+ // if no more tuples in left tuples, a join is completed.
+ // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+ frameTuple.setLeft(NullTuple.create(leftNumCols));
+ iterator = getUnmatchedRight();
+ finalLoop = true;
+ continue;
}
- }
- return outTuple;
- }
-
- protected void loadRightToHashTable() throws IOException {
- Tuple tuple;
- Tuple keyTuple;
+ frameTuple.setLeft(leftTuple);
- while (!context.isStopped() && (tuple = rightChild.next()) != null) {
- keyTuple = new VTuple(joinKeyPairs.size());
- for (int i = 0; i < rightKeyList.length; i++) {
- keyTuple.put(i, tuple.get(rightKeyList[i]));
+ if (leftFiltered(leftTuple)) {
+ iterator = nullIterator(rightNumCols);
+ continue;
}
-
- List<Tuple> newValue = tupleSlots.get(keyTuple);
- if (newValue != null) {
- newValue.add(tuple);
- } else {
- newValue = new ArrayList<Tuple>();
- newValue.add(tuple);
- tupleSlots.put(keyTuple, newValue);
- matched.put(keyTuple,false);
+ // getting corresponding right
+ Pair<Boolean, List<Tuple>> hashed = tupleSlots.get(toKey(leftTuple));
+ if (hashed == null) {
+ iterator = nullIterator(rightNumCols);
+ continue;
+ }
+ Iterator<Tuple> rightTuples = rightFiltered(hashed.getSecond());
+ if (!rightTuples.hasNext()) {
+ iterator = nullIterator(rightNumCols);
+ continue;
}
+ iterator = rightTuples;
+ hashed.setFirst(true); // match found
}
- first = false;
+
+ return null;
}
@Override
- public void rescan() throws IOException {
- super.rescan();
-
- tupleSlots.clear();
- first = true;
-
- finished = false;
- iterator = null;
- shouldGetLeftTuple = true;
+ protected Map<Tuple, Pair<Boolean, List<Tuple>>> convert(Map<Tuple, List<Tuple>> hashed,
+ boolean fromCache) throws IOException {
+ Map<Tuple, Pair<Boolean, List<Tuple>>> tuples = new HashMap<Tuple, Pair<Boolean, List<Tuple>>>(hashed.size());
+ for (Map.Entry<Tuple, List<Tuple>> entry : hashed.entrySet()) {
+ // flag: initially false (whether this join key had at least one match on the counter part)
+ tuples.put(entry.getKey(), new Pair<Boolean, List<Tuple>>(false, entry.getValue()));
+ }
+ return tuples;
}
@Override
- public void close() throws IOException {
- super.close();
- tupleSlots.clear();
- matched.clear();
- tupleSlots = null;
- matched = null;
- iterator = null;
+ public void rescan() throws IOException {
+ super.rescan();
+ for (Pair<Boolean, List<Tuple>> value : tupleSlots.values()) {
+ value.setFirst(false);
+ }
+ finalLoop = false;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 48f3682..a4215fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,225 +18,59 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.utils.CacheHolder;
-import org.apache.tajo.engine.utils.TableCacheKey;
import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
-public class HashJoinExec extends CommonJoinExec {
-
- protected List<Column[]> joinKeyPairs;
-
- // temporal tuples and states for nested loop join
- protected boolean first = true;
- protected FrameTuple frameTuple;
- protected Tuple outTuple = null;
- protected Map<Tuple, List<Tuple>> tupleSlots;
- protected Iterator<Tuple> iterator = null;
- protected Tuple leftTuple;
- protected Tuple leftKeyTuple;
-
- protected int [] leftKeyList;
- protected int [] rightKeyList;
-
- protected boolean finished = false;
- protected boolean shouldGetLeftTuple = true;
-
- private TableStats cachedRightTableStats;
+public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
PhysicalExec rightExec) {
super(context, plan, leftExec, rightExec);
-
- // HashJoin only can manage equi join key pairs.
- this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
- rightExec.getSchema(), false);
-
- leftKeyList = new int[joinKeyPairs.size()];
- rightKeyList = new int[joinKeyPairs.size()];
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
- }
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
- }
-
- // for join
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.size());
- leftKeyTuple = new VTuple(leftKeyList.length);
}
- protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
- for (int i = 0; i < leftKeyList.length; i++) {
- keyTuple.put(i, outerTuple.get(leftKeyList[i]));
- }
+ @Override
+ protected Map<Tuple, List<Tuple>> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+ throws IOException {
+ return fromCache ? new HashMap<Tuple, List<Tuple>>(hashed) : hashed;
}
+ @Override
public Tuple next() throws IOException {
if (first) {
loadRightToHashTable();
}
- Tuple rightTuple;
- boolean found = false;
-
- while(!context.isStopped() && !finished) {
- if (shouldGetLeftTuple) { // initially, it is true.
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- finished = true;
- return null;
- }
-
- // getting corresponding right
- getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
- List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
- if (rightTuples != null) { // found right tuples on in-memory hash table.
- iterator = rightTuples.iterator();
- shouldGetLeftTuple = false;
- } else {
- shouldGetLeftTuple = true;
- continue;
- }
- }
-
- // getting a next right tuple on in-memory hash table.
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
- if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable
+ while (!context.isStopped() && !finished) {
+ if (iterator != null && iterator.hasNext()) {
+ frameTuple.setRight(iterator.next());
projector.eval(frameTuple, outTuple);
- found = true;
- }
-
- if (!iterator.hasNext()) { // no more right tuples for this hash key
- shouldGetLeftTuple = true;
- }
-
- if (found) {
- break;
- }
- }
-
- return new VTuple(outTuple);
- }
-
- protected void loadRightToHashTable() throws IOException {
- ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
- if (scanExec.canBroadcast()) {
- /* If this table can broadcast, all tasks in a node will share the same cache */
- TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
- context, scanExec.getCanonicalName(), scanExec.getFragments());
- loadRightFromCache(key);
- } else {
- this.tupleSlots = buildRightToHashTable();
- }
-
- first = false;
- }
-
- protected void loadRightFromCache(TableCacheKey key) throws IOException {
- ExecutionBlockSharedResource sharedResource = context.getSharedResource();
- synchronized (sharedResource.getLock()) {
- if (sharedResource.hasBroadcastCache(key)) {
- CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
- this.tupleSlots = data.getData();
- this.cachedRightTableStats = data.getTableStats();
- } else {
- CacheHolder.BroadcastCacheHolder holder =
- new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
- sharedResource.addBroadcastCache(key, holder);
- CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
- this.tupleSlots = data.getData();
- this.cachedRightTableStats = data.getTableStats();
+ return outTuple;
}
- }
- }
-
- private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
- Tuple tuple;
- Tuple keyTuple;
- Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
-
- while (!context.isStopped() && (tuple = rightChild.next()) != null) {
- keyTuple = new VTuple(joinKeyPairs.size());
- for (int i = 0; i < rightKeyList.length; i++) {
- keyTuple.put(i, tuple.get(rightKeyList[i]));
+ Tuple leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = leftTuple == null;
+ continue;
}
- List<Tuple> newValue = map.get(keyTuple);
+ frameTuple.setLeft(leftTuple);
- if (newValue != null) {
- newValue.add(tuple);
- } else {
- newValue = new ArrayList<Tuple>();
- newValue.add(tuple);
- map.put(keyTuple, newValue);
+ // getting corresponding right
+ Iterable<Tuple> hashed = getRights(toKey(leftTuple));
+ Iterator<Tuple> rightTuples = rightFiltered(hashed);
+ if (rightTuples.hasNext()) {
+ iterator = rightTuples;
}
}
- return map;
- }
-
- @Override
- public void rescan() throws IOException {
- super.rescan();
-
- tupleSlots.clear();
- first = true;
-
- finished = false;
- iterator = null;
- shouldGetLeftTuple = true;
+ return null;
}
- @Override
- public void close() throws IOException {
- super.close();
- if (tupleSlots != null) {
- tupleSlots.clear();
- tupleSlots = null;
- }
-
- iterator = null;
+ private Iterable<Tuple> getRights(Tuple key) {
+ return tupleSlots.get(key);
}
- @Override
- public TableStats getInputStats() {
- if (leftChild == null) {
- return inputStats;
- }
- TableStats leftInputStats = leftChild.getInputStats();
- inputStats.setNumBytes(0);
- inputStats.setReadBytes(0);
- inputStats.setNumRows(0);
-
- if (leftInputStats != null) {
- inputStats.setNumBytes(leftInputStats.getNumBytes());
- inputStats.setReadBytes(leftInputStats.getReadBytes());
- inputStats.setNumRows(leftInputStats.getNumRows());
- }
-
- TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
- if (rightInputStats != null) {
- inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
- inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
- inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
- }
-
- return inputStats;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 881bf84..8239270 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -19,10 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
import java.io.IOException;
import java.util.List;
@@ -33,16 +31,10 @@ import java.util.List;
* If not found, it returns the tuple of the FROM side table with null padding.
*/
public class HashLeftAntiJoinExec extends HashJoinExec {
- private Tuple rightNullTuple;
public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
PhysicalExec notInSideChild) {
super(context, plan, fromSideChild, notInSideChild);
- // NUll Tuple
- rightNullTuple = new VTuple(leftChild.outColumnNum);
- for (int i = 0; i < leftChild.outColumnNum; i++) {
- rightNullTuple.put(i, NullDatum.get());
- }
}
/**
@@ -56,54 +48,33 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
* @return The tuple which is unmatched to a given join condition.
* @throws IOException
*/
+ @Override
public Tuple next() throws IOException {
if (first) {
loadRightToHashTable();
}
- Tuple rightTuple;
- boolean notFound;
-
while(!context.isStopped() && !finished) {
-
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- finished = true;
- return null;
- }
-
- // Try to find a hash bucket in in-memory hash table
- getKeyLeftTuple(leftTuple, leftKeyTuple);
- List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
- if (rightTuples != null) {
- // if found, it gets a hash bucket from the hash table.
- iterator = rightTuples.iterator();
- } else {
- // if not found, it returns a tuple.
- frameTuple.set(leftTuple, rightNullTuple);
+ if (iterator != null && iterator.hasNext()) {
+ frameTuple.setRight(iterator.next());
projector.eval(frameTuple, outTuple);
return outTuple;
}
-
- // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
- // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
- notFound = true;
- while (!context.isStopped() && notFound && iterator.hasNext()) {
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple);
- if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found
- notFound = false;
- }
+ // getting new outer
+ Tuple leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = leftTuple == null;
+ continue;
}
- if (notFound) { // if there is no matched tuple
- frameTuple.set(leftTuple, rightNullTuple);
- projector.eval(frameTuple, outTuple);
- break;
+ frameTuple.setLeft(leftTuple);
+
+ // Try to find a hash bucket in in-memory hash table
+ List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+ if (hashed == null || !rightFiltered(hashed).hasNext()) {
+ iterator = nullIterator(0);
}
}
-
- return outTuple;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 6f573d0..8613eac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,307 +18,61 @@
package org.apache.tajo.engine.planner.physical;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.utils.CacheHolder;
-import org.apache.tajo.engine.utils.TableCacheKey;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.plan.expr.AlgebraicUtil;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
+public class HashLeftOuterJoinExec extends HashJoinExec {
-public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
- // from logical plan
- protected JoinNode plan;
- protected EvalNode joinQual; // ex) a.id = b.id
- protected EvalNode joinFilter; // ex) a > 10
-
- protected List<Column[]> joinKeyPairs;
-
- // temporal tuples and states for nested loop join
- protected boolean first = true;
- protected FrameTuple frameTuple;
- protected Tuple outTuple = null;
- protected Map<Tuple, List<Tuple>> tupleSlots;
- protected Iterator<Tuple> iterator = null;
- protected Tuple leftTuple;
- protected Tuple leftKeyTuple;
-
- protected int [] leftKeyList;
- protected int [] rightKeyList;
-
- protected boolean finished = false;
- protected boolean shouldGetLeftTuple = true;
-
- // projection
- protected Projector projector;
-
- private int rightNumCols;
- private TableStats cachedRightTableStats;
private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
PhysicalExec rightChild) {
- super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
- plan.getOutSchema(), leftChild, rightChild);
- this.plan = plan;
-
- List<EvalNode> joinQuals = Lists.newArrayList();
- List<EvalNode> joinFilters = Lists.newArrayList();
- for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(plan.getJoinQual())) {
- if (EvalTreeUtil.isJoinQual(eachQual, true)) {
- joinQuals.add(eachQual);
- } else {
- joinFilters.add(eachQual);
- }
- }
-
- this.joinQual = AlgebraicUtil.createSingletonExprFromCNF(joinQuals.toArray(new EvalNode[joinQuals.size()]));
- if (joinFilters.size() > 0) {
- this.joinFilter = AlgebraicUtil.createSingletonExprFromCNF(joinFilters.toArray(new EvalNode[joinFilters.size()]));
- } else {
- this.joinFilter = null;
- }
-
- // HashJoin only can manage equi join key pairs.
- this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
- rightChild.getSchema(), false);
-
- leftKeyList = new int[joinKeyPairs.size()];
- rightKeyList = new int[joinKeyPairs.size()];
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
- }
-
- for (int i = 0; i < joinKeyPairs.size(); i++) {
- rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
- }
-
- // for projection
- this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
-
- // for join
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.size());
- leftKeyTuple = new VTuple(leftKeyList.length);
-
- rightNumCols = rightChild.getSchema().size();
-
- joinQual.bind(context.getEvalContext(), inSchema);
- if (joinFilter != null) {
- joinFilter.bind(context.getEvalContext(), inSchema);
- }
+ super(context, plan, leftChild, rightChild);
}
@Override
- protected void compile() {
- joinQual = context.getPrecompiledEval(inSchema, joinQual);
- }
-
- protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
- for (int i = 0; i < leftKeyList.length; i++) {
- keyTuple.put(i, outerTuple.get(leftKeyList[i]));
- }
- }
-
public Tuple next() throws IOException {
if (first) {
loadRightToHashTable();
}
- Tuple rightTuple;
- boolean found = false;
-
- while(!context.isStopped() && !finished) {
-
- if (shouldGetLeftTuple) { // initially, it is true.
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- finished = true;
- return null;
- }
-
- // getting corresponding right
- getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
- List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
- if (rightTuples != null) { // found right tuples on in-memory hash table.
- iterator = rightTuples.iterator();
- shouldGetLeftTuple = false;
- } else {
- // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(frameTuple, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- shouldGetLeftTuple = true;
- return outTuple;
- }
- }
-
- // getting a next right tuple on in-memory hash table.
- rightTuple = iterator.next();
- if (!iterator.hasNext()) { // no more right tuples for this hash key
- shouldGetLeftTuple = true;
- }
-
- frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-
- // if there is no join filter, it is always true.
- boolean satisfiedWithFilter = joinFilter == null || joinFilter.eval(frameTuple).isTrue();
- boolean satisfiedWithJoinCondition = joinQual.eval(frameTuple).isTrue();
-
- // if a composited tuple satisfies with both join filter and join condition
- if (satisfiedWithJoinCondition && satisfiedWithFilter) {
- projector.eval(frameTuple, outTuple);
- return outTuple;
- } else {
-
- // if join filter is satisfied, the left outer join (LOJ) operator should return the null padded tuple
- // only once. Then, LOJ operator should take the next left tuple.
- if (!satisfiedWithFilter) {
- shouldGetLeftTuple = true;
- }
-
- // null padding
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
-
+ while (!context.isStopped() && !finished) {
+ if (iterator != null && iterator.hasNext()) {
+ frameTuple.setRight(iterator.next());
projector.eval(frameTuple, outTuple);
return outTuple;
}
- }
-
- return outTuple;
- }
-
- protected void loadRightToHashTable() throws IOException {
- ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
- if (scanExec.canBroadcast()) {
- /* If this table can broadcast, all tasks in a node will share the same cache */
- TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
- context, scanExec.getCanonicalName(), scanExec.getFragments());
- loadRightFromCache(key);
- } else {
- this.tupleSlots = buildRightToHashTable();
- }
-
- first = false;
- }
-
- protected void loadRightFromCache(TableCacheKey key) throws IOException {
- ExecutionBlockSharedResource sharedResource = context.getSharedResource();
- synchronized (sharedResource.getLock()) {
- if (sharedResource.hasBroadcastCache(key)) {
- CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
- this.tupleSlots = data.getData();
- this.cachedRightTableStats = data.getTableStats();
- } else {
- CacheHolder.BroadcastCacheHolder holder =
- new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
- sharedResource.addBroadcastCache(key, holder);
- CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
- this.tupleSlots = data.getData();
- this.cachedRightTableStats = data.getTableStats();
+ Tuple leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
}
- }
- }
+ frameTuple.setLeft(leftTuple);
- private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
- Tuple tuple;
- Tuple keyTuple;
- Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
-
- while (!context.isStopped() && (tuple = rightChild.next()) != null) {
- keyTuple = new VTuple(joinKeyPairs.size());
- for (int i = 0; i < rightKeyList.length; i++) {
- keyTuple.put(i, tuple.get(rightKeyList[i]));
+ if (leftFiltered(leftTuple)) {
+ iterator = nullIterator(rightNumCols);
+ continue;
}
- List<Tuple> newValue = map.get(keyTuple);
-
- if (newValue != null) {
- newValue.add(tuple);
- } else {
- newValue = new ArrayList<Tuple>();
- newValue.add(tuple);
- map.put(keyTuple, newValue);
+ // getting corresponding right
+ List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+ Iterator<Tuple> rightTuples = rightFiltered(hashed);
+ if (!rightTuples.hasNext()) {
+ //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+ //output a tuple with the nulls padded rightTuple
+ iterator = nullIterator(rightNumCols);
+ continue;
}
+ iterator = rightTuples;
}
- return map;
- }
-
- @Override
- public void rescan() throws IOException {
- super.rescan();
-
- tupleSlots.clear();
- first = true;
-
- finished = false;
- iterator = null;
- shouldGetLeftTuple = true;
- }
-
-
- @Override
- public void close() throws IOException {
- super.close();
- tupleSlots.clear();
- tupleSlots = null;
- iterator = null;
- plan = null;
- joinQual = null;
- joinFilter = null;
- projector = null;
- }
-
- public JoinNode getPlan() {
- return this.plan;
- }
-
- @Override
- public TableStats getInputStats() {
- if (leftChild == null) {
- return inputStats;
- }
- TableStats leftInputStats = leftChild.getInputStats();
- inputStats.setNumBytes(0);
- inputStats.setReadBytes(0);
- inputStats.setNumRows(0);
-
- if (leftInputStats != null) {
- inputStats.setNumBytes(leftInputStats.getNumBytes());
- inputStats.setReadBytes(leftInputStats.getReadBytes());
- inputStats.setNumRows(leftInputStats.getNumRows());
- }
-
- TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
- if (rightInputStats != null) {
- inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
- inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
- inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
- }
-
- return inputStats;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 32e6d08..41e842a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -50,50 +50,34 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
* @return The tuple which is firstly matched to a given join condition.
* @throws java.io.IOException
*/
+ @Override
public Tuple next() throws IOException {
if (first) {
loadRightToHashTable();
}
- Tuple rightTuple;
- boolean notFound;
-
while(!context.isStopped() && !finished) {
-
- // getting new outer
- leftTuple = leftChild.next(); // it comes from a disk
- if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
- finished = true;
- return null;
+ if (iterator != null && iterator.hasNext()) {
+ frameTuple.setRight(iterator.next());
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
}
-
- // Try to find a hash bucket in in-memory hash table
- getKeyLeftTuple(leftTuple, leftKeyTuple);
- List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
- if (rightTuples != null) {
- // if found, it gets a hash bucket from the hash table.
- iterator = rightTuples.iterator();
- } else {
+ // getting new outer
+ Tuple leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = leftTuple == null;
continue;
}
- // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
- // If it finds any matched tuple, it returns the tuple immediately.
- notFound = true;
- while (notFound && iterator.hasNext()) {
- rightTuple = iterator.next();
- frameTuple.set(leftTuple, rightTuple);
- if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found
- notFound = false;
- projector.eval(frameTuple, outTuple);
- }
- }
+ frameTuple.setLeft(leftTuple);
- if (!notFound) { // if there is no matched tuple
- break;
+ // Try to find a hash bucket in in-memory hash table
+ List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+ if (hashed != null && rightFiltered(hashed).hasNext()) {
+ // if found, it gets a hash bucket from the hash table.
+ iterator = nullIterator(0);
}
}
-
- return outTuple;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
deleted file mode 100644
index 735623d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class NLLeftOuterJoinExec extends CommonJoinExec {
- // temporal tuples and states for nested loop join
- private boolean needNextRightTuple;
- private FrameTuple frameTuple;
- private Tuple leftTuple = null;
- private Tuple rightTuple = null;
- private Tuple outTuple = null;
-
- private boolean foundAtLeastOneMatch;
- private int rightNumCols;
-
- public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
- PhysicalExec rightChild) {
- super(context, plan, leftChild, rightChild);
- // for join
- needNextRightTuple = true;
- frameTuple = new FrameTuple();
- outTuple = new VTuple(outSchema.size());
-
- foundAtLeastOneMatch = false;
- rightNumCols = rightChild.getSchema().size();
- }
-
- public Tuple next() throws IOException {
- while (!context.isStopped()) {
- if (needNextRightTuple) {
- leftTuple = leftChild.next();
- if (leftTuple == null) {
- return null;
- }
- needNextRightTuple = false;
- // a new tuple from the left child has initially no matches on the right operand
- foundAtLeastOneMatch = false;
- }
- rightTuple = rightChild.next();
-
- if (rightTuple == null) {
- // the scan of the right operand is finished with no matches found
- if(foundAtLeastOneMatch == false){
- //output a tuple with the nulls padded rightTuple
- Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
- frameTuple.set(leftTuple, nullPaddedTuple);
- projector.eval(frameTuple, outTuple);
- // we simulate we found a match, which is exactly the null padded one
- foundAtLeastOneMatch = true;
- needNextRightTuple = true;
- rightChild.rescan();
- return outTuple;
- } else {
- needNextRightTuple = true;
- rightChild.rescan();
- continue;
- }
- }
-
- frameTuple.set(leftTuple, rightTuple);
- ;
- if (joinQual.eval(frameTuple).isTrue()) {
- projector.eval(frameTuple, outTuple);
- foundAtLeastOneMatch = true;
- return outTuple;
- }
- }
- return null;
- }
-
- @Override
- public void rescan() throws IOException {
- super.rescan();
- needNextRightTuple = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index 505b599..c4d90a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -79,9 +79,6 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException;
- RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
- throws PhysicalPlanningException;
-
RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
throws PhysicalPlanningException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 7abfbe6..fd825b1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -102,7 +102,6 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
* @throws IOException
*/
public Tuple next() throws IOException {
- Tuple previous;
while (!context.isStopped()) {
boolean newRound = false;
@@ -121,7 +120,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
// The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
if (end) {
- if (initRightDone == false) {
+ if (!initRightDone) {
// maybe the left operand was empty => the right one didn't have the chance to initialize
rightTuple = rightChild.next();
initRightDone = true;
@@ -160,18 +159,24 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
}
}
- if(rightTuple == null){
+ if(rightTuple == null) {
rightTuple = rightChild.next();
-
- if(rightTuple != null){
- initRightDone = true;
- }
- else {
+ if (rightTuple == null) {
initRightDone = true;
end = true;
continue;
}
}
+ if (rightFiltered(rightTuple)) {
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(frameTuple, outTuple);
+
+ rightTuple = null;
+ return outTuple;
+ }
+ initRightDone = true;
+
//////////////////////////////////////////////////////////////////////
// END INITIALIZATION STAGE
//////////////////////////////////////////////////////////////////////
@@ -203,10 +208,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
// we simulate we found a match, which is exactly the null padded one
// BEFORE RETURN, MOVE FORWARD
- rightTuple = rightChild.next();
- if(rightTuple == null) {
- end = true;
- }
+ rightTuple = null;
return outTuple;
} else if (cmp < 0) {
@@ -223,6 +225,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
// END MOVE FORWARDING STAGE
//////////////////////////////////////////////////////////////////////
+ Tuple previous = null;
// once a match is found, retain all tuples with this key in tuple slots on each side
if(!end) {
endInPopulationStage = false;
@@ -257,6 +260,19 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
endInPopulationStage = true;
}
} // if end false
+ if (previous != null && rightFiltered(previous)) {
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, previous);
+ projector.eval(frameTuple, outTuple);
+
+ // reset tuple slots for a new round
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+
+ return outTuple;
+ }
} // if newRound
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
index 6a5c0bf..addca49 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.utils;
-import com.google.common.collect.Maps;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.storage.Tuple;
@@ -66,7 +65,7 @@ public interface CacheHolder<T> {
@Override
public Map<Tuple, List<Tuple>> getData() {
- return Maps.newHashMap(data);
+ return data;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 95debd4..7210214 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -206,10 +206,10 @@ public class TestHashSemiJoinExec {
// expect result without duplicated tuples.
while ((tuple = exec.next()) != null) {
count++;
- assertTrue(i == tuple.get(0).asInt4());
- assertTrue(i == tuple.get(1).asInt4());
- assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
- assertTrue(10 + i == tuple.get(3).asInt4());
+ assertEquals(i, tuple.get(0).asInt4());
+ assertEquals(i, tuple.get(1).asInt4());
+ assertEquals("dept_" + i, tuple.get(2).asChars());
+ assertEquals(10 + i, tuple.get(3).asInt4());
i += 2;
}