You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/20 05:41:01 UTC

git commit: TAJO-609: PlannerUtil::getRelationLineage ignores PartitionedTableScanNode.

Repository: incubator-tajo
Updated Branches:
  refs/heads/master 532f6f3f4 -> 57bf90e22


TAJO-609: PlannerUtil::getRelationLineage ignores PartitionedTableScanNode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/57bf90e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/57bf90e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/57bf90e2

Branch: refs/heads/master
Commit: 57bf90e220d881f39f0c91877edc57be7005a596
Parents: 532f6f3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Feb 20 13:40:14 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Feb 20 13:40:14 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../engine/planner/PhysicalPlannerImpl.java     |  98 +++++++++++++-----
 .../apache/tajo/engine/planner/PlannerUtil.java |  32 +++---
 .../apache/tajo/LocalTajoTestingUtility.java    |   9 +-
 .../planner/physical/TestHashJoinExec.java      | 102 ++++++++++++++++++-
 5 files changed, 199 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af59b4c..f472c87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -256,6 +256,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-609: PlannerUtil::getRelationLineage ignores PartitionedTableScanNode.
+    (hyunsik)
+
     TAJO-606: Statemachine visualization fails. (Min Zhou via hyunsik)
 
     TAJO-595: The same expressions without different alias are not allowed.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5583efd..67f5630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -21,6 +21,7 @@
  */
 package org.apache.tajo.engine.planner;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -50,6 +52,7 @@ import java.util.Stack;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
@@ -60,6 +63,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
   private static final int UNGENERATED_PID = -1;
+  private final long INNER_JOIN_INMEMORY_HASH_THRESHOLD;
 
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
@@ -67,6 +71,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
     this.conf = conf;
     this.sm = sm;
+
+    this.INNER_JOIN_INMEMORY_HASH_THRESHOLD = conf.getLongVar(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
   }
 
   public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
@@ -215,7 +221,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
+  @VisibleForTesting
+  public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
     long size = 0;
     for (String tableId : tableIds) {
       // TODO - CSV is a hack.
@@ -228,6 +235,21 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return size;
   }
 
+  @VisibleForTesting
+  public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
+      throws IOException {
+    String [] lineage = PlannerUtil.getRelationLineage(node);
+    long volume = estimateSizeRecursive(context, lineage);
+    boolean inMemoryInnerJoinFlag = volume <= INNER_JOIN_INMEMORY_HASH_THRESHOLD;
+    LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
+        context.getTaskId().toString(),
+        (left ? "Left" : "Right"),
+        TUtil.arrayToString(lineage),
+        FileUtil.humanReadableByteCount(volume, false),
+        (inMemoryInnerJoinFlag ? "" : "not ")));
+    return inMemoryInnerJoinFlag;
+  }
+
   public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
                                      PhysicalExec rightExec) throws IOException {
 
@@ -307,7 +329,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           return new BNLJoinExec(context, plan, leftExec, rightExec);
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
-          return new HashJoinExec(context, plan, leftExec, rightExec);
+          // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+          PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+          return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
         case MERGE_JOIN:
           LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
           return createMergeInnerJoin(context, plan, leftExec, rightExec);
@@ -318,42 +342,62 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
           return createMergeInnerJoin(context, plan, leftExec, rightExec);
       }
-
-
     } else {
       return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+  /**
+   * It returns two {@link org.apache.tajo.engine.planner.physical.PhysicalExec}s sorted in an ascending order of
+   * their child relations' total volume. In other words, the smaller side is returned as 0's PhysicalExec, and
+   * the larger side is returned as 1's PhysicalExec.
+   */
+  @VisibleForTesting
+  public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan,
+                                                     PhysicalExec left, PhysicalExec right) throws IOException {
     String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
     String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long leftSize = estimateSizeRecursive(context, leftLineage);
     long rightSize = estimateSizeRecursive(context, rightLineage);
 
-    final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
-
-    boolean hashJoin = false;
-    if (leftSize < threshold || rightSize < threshold) {
-      hashJoin = true;
+    PhysicalExec smaller;
+    PhysicalExec larger;
+    if (leftSize <= rightSize) {
+      smaller = left;
+      larger = right;
+      LOG.info(String.format("[%s] Left relations %s (%s) is smaller than Right relations %s (%s).",
+          context.getTaskId().toString(),
+          TUtil.arrayToString(leftLineage),
+          FileUtil.humanReadableByteCount(leftSize, false),
+          TUtil.arrayToString(rightLineage),
+          FileUtil.humanReadableByteCount(rightSize, false)));
+    } else {
+      smaller = right;
+      larger = left;
+      LOG.info(String.format("[%s] Right relations %s (%s) is smaller than Left relations %s (%s).",
+          context.getTaskId().toString(),
+          TUtil.arrayToString(rightLineage),
+          FileUtil.humanReadableByteCount(rightSize, false),
+          TUtil.arrayToString(leftLineage),
+          FileUtil.humanReadableByteCount(leftSize, false)));
     }
 
-    if (hashJoin) {
-      PhysicalExec selectedOuter;
-      PhysicalExec selectedInner;
+    return new PhysicalExec [] {smaller, larger};
+  }
 
-      // HashJoinExec loads the inner relation to memory.
-      if (leftSize <= rightSize) {
-        selectedInner = leftExec;
-        selectedOuter = rightExec;
-      } else {
-        selectedInner = rightExec;
-        selectedOuter = leftExec;
-      }
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    boolean inMemoryHashJoin = false;
+    if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
+        || checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
+      inMemoryHashJoin = true;
+    }
 
-      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
-      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+    if (inMemoryHashJoin) {
+      LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+      // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+      PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+      return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
     } else {
       return createMergeInnerJoin(context, plan, leftExec, rightExec);
     }
@@ -409,7 +453,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long rightTableVolume = estimateSizeRecursive(context, rightLineage);
 
-    if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
+    if (rightTableVolume < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
       // we can implement left outer join using hash join, using the right operand as the build relation
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
       return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -427,7 +471,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     // blocking, but merge join is blocking as well)
     String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
     long outerSize = estimateSizeRecursive(context, outerLineage4);
-    if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
+    if (outerSize < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
       LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
       return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
     } else {
@@ -894,7 +938,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
-    final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
+    final long threshold = conf.getLongVar(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
 
     // if the relation size is less than the threshold,
     // the hash aggregation will be used.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 624518b..7d5e2fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.CountRowsFunctionExpr;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.GeneralSetFunctionExpr;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -54,13 +57,13 @@ public class PlannerUtil {
   }
 
   /**
-   * Get all scan nodes from a logical operator tree.
+   * Get all RelationNodes which are descendant of a given LogicalNode.
    *
-   * @param node a start node
-   * @return an array of relation names
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
    */
-  public static String [] getRelationLineage(LogicalNode node) {
-    LogicalNode [] scans =  PlannerUtil.findAllNodes(node, NodeType.SCAN);
+  public static String [] getRelationLineage(LogicalNode from) {
+    LogicalNode [] scans = findAllNodes(from, NodeType.SCAN, NodeType.PARTITIONS_SCAN);
     String [] tableNames = new String[scans.length];
     ScanNode scan;
     for (int i = 0; i < scans.length; i++) {
@@ -71,15 +74,16 @@ public class PlannerUtil {
   }
 
   /**
-   * Get all scan nodes from a logical operator tree within a query block
+   * Get all RelationNodes which are descendant of a given LogicalNode.
+   * The finding is restricted within a query block.
    *
-   * @param node a start node
-   * @return an array of relation names
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
    */
-  public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode node)
+  public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode from)
       throws PlanningException {
     RelationFinderVisitor visitor = new RelationFinderVisitor();
-    visitor.visit(null, plan, null, node, new Stack<LogicalNode>());
+    visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
     return visitor.getFoundRelations();
   }
 
@@ -297,7 +301,7 @@ public class PlannerUtil {
    * @param type to find
    * @return a found logical node
    */
-  public static LogicalNode [] findAllNodes(LogicalNode node, NodeType type) {
+  public static LogicalNode [] findAllNodes(LogicalNode node, NodeType...type) {
     Preconditions.checkNotNull(node);
     Preconditions.checkNotNull(type);
 
@@ -439,6 +443,10 @@ public class PlannerUtil {
     public List<LogicalNode> getFoundNodes() {
       return list;
     }
+
+    public LogicalNode [] getFoundNodeArray() {
+      return list.toArray(new LogicalNode[list.size()]);
+    }
   }
   
   private static class ParentNodeFinder implements LogicalNodeVisitor {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index ae59d11..50fed7b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,7 +23,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
@@ -40,9 +43,11 @@ public class LocalTajoTestingUtility {
   private TajoConf conf;
   private TajoClient client;
 
+  private static int taskAttemptId;
+
   public static QueryUnitAttemptId newQueryUnitAttemptId() {
     return QueryIdFactory.newQueryUnitAttemptId(
-        QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), 0);
+        QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++);
   }
   public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
     return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/57bf90e2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 48cd265..8d319ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,8 +35,10 @@ import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,8 +46,7 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestHashJoinExec {
   private TajoConf conf;
@@ -172,4 +171,99 @@ public class TestHashJoinExec {
     exec.close();
     assertEquals(10 / 2, count);
   }
+
+  @Test
+  public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    TajoConf localConf = new TajoConf(conf);
+    localConf.setLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD, 100l);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(localConf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashJoinExec);
+    HashJoinExec joinExec = proj.getChild();
+
+    assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+  }
+
+  /**
+   * It checks inner-join related functions. It will return TRUE if left relations is smaller than right relations.
+   *
+   * The below unit tests will work according to which side is smaller. In this unit tests, we use two tables: p and e.
+   * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot expect that which side is smaller,
+   * we use some boolean variable <code>leftSmaller</code> to indicate which side is small.
+   */
+  private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
+                                                       PhysicalPlannerImpl phyPlanner,
+                                                       JoinNode joinNode, BinaryPhysicalExec joinExec) throws
+      IOException {
+
+    String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
+    String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
+
+    boolean leftSmaller;
+    if (left[0].equals("p")) {
+      leftSmaller = true;
+    } else {
+      leftSmaller = false;
+    }
+
+    long leftSize = phyPlanner.estimateSizeRecursive(ctx, left);
+    long rightSize = phyPlanner.estimateSizeRecursive(ctx, right);
+
+    // The table p is 75 bytes, and the table e is 140 bytes.
+    if (leftSmaller) { // if left one is smaller
+      assertEquals(75, leftSize);
+      assertEquals(140, rightSize);
+    } else { // if right one is smaller
+      assertEquals(140, leftSize);
+      assertEquals(75, rightSize);
+    }
+
+    if (leftSmaller) {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[0], joinExec.getLeftChild());
+      assertEquals(ordered[1], joinExec.getRightChild());
+
+      assertEquals("p", left[0]);
+      assertEquals("e", right[0]);
+    } else {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[1], joinExec.getLeftChild());
+      assertEquals(ordered[0], joinExec.getRightChild());
+
+      assertEquals("e", left[0]);
+      assertEquals("p", right[0]);
+    }
+
+    if (leftSmaller) {
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+    } else {
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+    }
+
+    return leftSmaller;
+  }
 }