You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/20 05:41:01 UTC
git commit: TAJO-609: PlannerUtil::getRelationLineage ignores
PartitionedTableScanNode.
Repository: incubator-tajo
Updated Branches:
refs/heads/master 532f6f3f4 -> 57bf90e22
TAJO-609: PlannerUtil::getRelationLineage ignores PartitionedTableScanNode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/57bf90e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/57bf90e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/57bf90e2
Branch: refs/heads/master
Commit: 57bf90e220d881f39f0c91877edc57be7005a596
Parents: 532f6f3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Feb 20 13:40:14 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Feb 20 13:40:14 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../engine/planner/PhysicalPlannerImpl.java | 98 +++++++++++++-----
.../apache/tajo/engine/planner/PlannerUtil.java | 32 +++---
.../apache/tajo/LocalTajoTestingUtility.java | 9 +-
.../planner/physical/TestHashJoinExec.java | 102 ++++++++++++++++++-
5 files changed, 199 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af59b4c..f472c87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -256,6 +256,9 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-609: PlannerUtil::getRelationLineage ignores PartitionedTableScanNode.
+ (hyunsik)
+
TAJO-606: Statemachine visualization fails. (Min Zhou via hyunsik)
TAJO-595: The same expressions without different alias are not allowed.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5583efd..67f5630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -21,6 +21,7 @@
*/
package org.apache.tajo.engine.planner;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -50,6 +52,7 @@ import java.util.Stack;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
@@ -60,6 +63,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
private static final int UNGENERATED_PID = -1;
+ private final long INNER_JOIN_INMEMORY_HASH_THRESHOLD;
protected final TajoConf conf;
protected final AbstractStorageManager sm;
@@ -67,6 +71,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
this.sm = sm;
+
+ this.INNER_JOIN_INMEMORY_HASH_THRESHOLD = conf.getLongVar(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
}
public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
@@ -215,7 +221,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
+ @VisibleForTesting
+ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
long size = 0;
for (String tableId : tableIds) {
// TODO - CSV is a hack.
@@ -228,6 +235,21 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return size;
}
+ @VisibleForTesting
+ public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
+ throws IOException {
+ String [] lineage = PlannerUtil.getRelationLineage(node);
+ long volume = estimateSizeRecursive(context, lineage);
+ boolean inMemoryInnerJoinFlag = volume <= INNER_JOIN_INMEMORY_HASH_THRESHOLD;
+ LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
+ context.getTaskId().toString(),
+ (left ? "Left" : "Right"),
+ TUtil.arrayToString(lineage),
+ FileUtil.humanReadableByteCount(volume, false),
+ (inMemoryInnerJoinFlag ? "" : "not ")));
+ return inMemoryInnerJoinFlag;
+ }
+
public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
PhysicalExec rightExec) throws IOException {
@@ -307,7 +329,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new BNLJoinExec(context, plan, leftExec, rightExec);
case IN_MEMORY_HASH_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
- return new HashJoinExec(context, plan, leftExec, rightExec);
+ // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+ PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+ return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
case MERGE_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
return createMergeInnerJoin(context, plan, leftExec, rightExec);
@@ -318,42 +342,62 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
return createMergeInnerJoin(context, plan, leftExec, rightExec);
}
-
-
} else {
return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
}
}
- private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
- PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ /**
+ * It returns two {@link org.apache.tajo.engine.planner.physical.PhysicalExec}s sorted in an ascending order of
+ * their child relations' total volume. In other words, the smaller side is returned as 0's PhysicalExec, and
+ * the larger side is returned as 1's PhysicalExec.
+ */
+ @VisibleForTesting
+ public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec left, PhysicalExec right) throws IOException {
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
-
- boolean hashJoin = false;
- if (leftSize < threshold || rightSize < threshold) {
- hashJoin = true;
+ PhysicalExec smaller;
+ PhysicalExec larger;
+ if (leftSize <= rightSize) {
+ smaller = left;
+ larger = right;
+ LOG.info(String.format("[%s] Left relations %s (%s) is smaller than Right relations %s (%s).",
+ context.getTaskId().toString(),
+ TUtil.arrayToString(leftLineage),
+ FileUtil.humanReadableByteCount(leftSize, false),
+ TUtil.arrayToString(rightLineage),
+ FileUtil.humanReadableByteCount(rightSize, false)));
+ } else {
+ smaller = right;
+ larger = left;
+ LOG.info(String.format("[%s] Right relations %s (%s) is smaller than Left relations %s (%s).",
+ context.getTaskId().toString(),
+ TUtil.arrayToString(rightLineage),
+ FileUtil.humanReadableByteCount(rightSize, false),
+ TUtil.arrayToString(leftLineage),
+ FileUtil.humanReadableByteCount(leftSize, false)));
}
- if (hashJoin) {
- PhysicalExec selectedOuter;
- PhysicalExec selectedInner;
+ return new PhysicalExec [] {smaller, larger};
+ }
- // HashJoinExec loads the inner relation to memory.
- if (leftSize <= rightSize) {
- selectedInner = leftExec;
- selectedOuter = rightExec;
- } else {
- selectedInner = rightExec;
- selectedOuter = leftExec;
- }
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ boolean inMemoryHashJoin = false;
+ if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
+ || checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
+ inMemoryHashJoin = true;
+ }
- LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
- return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+ if (inMemoryHashJoin) {
+ LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+ // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+ PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+ return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
} else {
return createMergeInnerJoin(context, plan, leftExec, rightExec);
}
@@ -409,7 +453,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
- if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
+ if (rightTableVolume < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -427,7 +471,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
// blocking, but merge join is blocking as well)
String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
long outerSize = estimateSizeRecursive(context, outerLineage4);
- if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
+ if (outerSize < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
@@ -894,7 +938,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
+ final long threshold = conf.getLongVar(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
// if the relation size is less than the threshold,
// the hash aggregation will be used.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 624518b..7d5e2fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.CountRowsFunctionExpr;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.GeneralSetFunctionExpr;
+import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -54,13 +57,13 @@ public class PlannerUtil {
}
/**
- * Get all scan nodes from a logical operator tree.
+ * Get all RelationNodes which are descendant of a given LogicalNode.
*
- * @param node a start node
- * @return an array of relation names
+ * @param from The LogicalNode to start visiting LogicalNodes.
+ * @return an array of all descendant RelationNode of LogicalNode.
*/
- public static String [] getRelationLineage(LogicalNode node) {
- LogicalNode [] scans = PlannerUtil.findAllNodes(node, NodeType.SCAN);
+ public static String [] getRelationLineage(LogicalNode from) {
+ LogicalNode [] scans = findAllNodes(from, NodeType.SCAN, NodeType.PARTITIONS_SCAN);
String [] tableNames = new String[scans.length];
ScanNode scan;
for (int i = 0; i < scans.length; i++) {
@@ -71,15 +74,16 @@ public class PlannerUtil {
}
/**
- * Get all scan nodes from a logical operator tree within a query block
+ * Get all RelationNodes which are descendant of a given LogicalNode.
+ * The finding is restricted within a query block.
*
- * @param node a start node
- * @return an array of relation names
+ * @param from The LogicalNode to start visiting LogicalNodes.
+ * @return an array of all descendant RelationNode of LogicalNode.
*/
- public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode node)
+ public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode from)
throws PlanningException {
RelationFinderVisitor visitor = new RelationFinderVisitor();
- visitor.visit(null, plan, null, node, new Stack<LogicalNode>());
+ visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
return visitor.getFoundRelations();
}
@@ -297,7 +301,7 @@ public class PlannerUtil {
* @param type to find
* @return a found logical node
*/
- public static LogicalNode [] findAllNodes(LogicalNode node, NodeType type) {
+ public static LogicalNode [] findAllNodes(LogicalNode node, NodeType...type) {
Preconditions.checkNotNull(node);
Preconditions.checkNotNull(type);
@@ -439,6 +443,10 @@ public class PlannerUtil {
public List<LogicalNode> getFoundNodes() {
return list;
}
+
+ public LogicalNode [] getFoundNodeArray() {
+ return list.toArray(new LogicalNode[list.size()]);
+ }
}
private static class ParentNodeFinder implements LogicalNodeVisitor {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index ae59d11..50fed7b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,7 +23,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
@@ -40,9 +43,11 @@ public class LocalTajoTestingUtility {
private TajoConf conf;
private TajoClient client;
+ private static int taskAttemptId;
+
public static QueryUnitAttemptId newQueryUnitAttemptId() {
return QueryIdFactory.newQueryUnitAttemptId(
- QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), 0);
+ QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++);
}
public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 48cd265..8d319ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,8 +35,10 @@ import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,8 +46,7 @@ import org.junit.Test;
import java.io.IOException;
import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestHashJoinExec {
private TajoConf conf;
@@ -172,4 +171,99 @@ public class TestHashJoinExec {
exec.close();
assertEquals(10 / 2, count);
}
+
+ @Test
+ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ TajoConf localConf = new TajoConf(conf);
+ localConf.setLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD, 100l);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(localConf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof HashJoinExec);
+ HashJoinExec joinExec = proj.getChild();
+
+ assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+ }
+
+ /**
+ * It checks inner-join related functions. It will return TRUE if left relations is smaller than right relations.
+ *
+ * The below unit tests will work according to which side is smaller. In this unit tests, we use two tables: p and e.
+ * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot expect that which side is smaller,
+ * we use some boolean variable <code>leftSmaller</code> to indicate which side is small.
+ */
+ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
+ PhysicalPlannerImpl phyPlanner,
+ JoinNode joinNode, BinaryPhysicalExec joinExec) throws
+ IOException {
+
+ String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
+ String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
+
+ boolean leftSmaller;
+ if (left[0].equals("p")) {
+ leftSmaller = true;
+ } else {
+ leftSmaller = false;
+ }
+
+ long leftSize = phyPlanner.estimateSizeRecursive(ctx, left);
+ long rightSize = phyPlanner.estimateSizeRecursive(ctx, right);
+
+ // The table p is 75 bytes, and the table e is 140 bytes.
+ if (leftSmaller) { // if left one is smaller
+ assertEquals(75, leftSize);
+ assertEquals(140, rightSize);
+ } else { // if right one is smaller
+ assertEquals(140, leftSize);
+ assertEquals(75, rightSize);
+ }
+
+ if (leftSmaller) {
+ PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+ joinExec.getRightChild());
+ assertEquals(ordered[0], joinExec.getLeftChild());
+ assertEquals(ordered[1], joinExec.getRightChild());
+
+ assertEquals("p", left[0]);
+ assertEquals("e", right[0]);
+ } else {
+ PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+ joinExec.getRightChild());
+ assertEquals(ordered[1], joinExec.getLeftChild());
+ assertEquals(ordered[0], joinExec.getRightChild());
+
+ assertEquals("e", left[0]);
+ assertEquals("p", right[0]);
+ }
+
+ if (leftSmaller) {
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ } else {
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ }
+
+ return leftSmaller;
+ }
}