You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/05 01:50:32 UTC

[flink] branch master updated: [FLINK-13089][table-planner-blink] Implement batch nested loop join and add some join itcases

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd0cf36  [FLINK-13089][table-planner-blink] Implement batch nested loop join and add some join itcases
cd0cf36 is described below

commit cd0cf360e8831dd5116579283832f919e5ce82dc
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Fri Jul 5 09:50:22 2019 +0800

    [FLINK-13089][table-planner-blink] Implement batch nested loop join and add some join itcases
    
    This closes #8978
---
 .../codegen/NestedLoopJoinCodeGenerator.scala      | 369 ++++++++++++++++
 .../physical/batch/BatchExecNestedLoopJoin.scala   |  52 ++-
 .../runtime/batch/sql/Limit0RemoveITCase.scala     |  10 +-
 .../flink/table/runtime/batch/sql/MiscITCase.scala |   5 +-
 .../batch/sql/agg/AggregateRemoveITCase.scala      |  10 +-
 .../batch/sql/agg/PruneAggregateCallITCase.scala   |  68 ++-
 .../runtime/batch/sql/join/InnerJoinITCase.scala   |  23 +-
 .../table/runtime/batch/sql/join/JoinITCase.scala  |   9 +-
 .../batch/sql/join/JoinWithoutKeyITCase.scala      | 374 ++++++++++++++++
 .../runtime/batch/sql/join/OuterJoinITCase.scala   |  20 +-
 .../runtime/batch/sql/join/ScalarQueryITCase.scala |  57 +++
 .../runtime/batch/sql/join/SemiJoinITCase.scala    | 491 +++++++++++++++++++++
 12 files changed, 1415 insertions(+), 73 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
new file mode 100644
index 0000000..264a52a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, BINARY_ROW, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, className, newName}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
+import org.apache.flink.table.dataformat.{BaseRow, JoinedRow}
+import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.runtime.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.join.FlinkJoinType
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.AbstractRowSerializer
+
+import org.apache.calcite.rex.RexNode
+
+import java.util
+
+/**
+  * Code gen for nested loop join.
+  */
+class NestedLoopJoinCodeGenerator(
+    ctx: CodeGeneratorContext,
+    singleRowJoin: Boolean,
+    leftIsBuild: Boolean,
+    leftType: RowType,
+    rightType: RowType,
+    outputType: RowType,
+    joinType: FlinkJoinType,
+    condition: RexNode) {
+
+  val (buildRow, buildArity, probeRow, probeArity) = {
+    val leftArity = leftType.getFieldCount
+    val rightArity = rightType.getFieldCount
+    if (leftIsBuild) {
+      (DEFAULT_INPUT1_TERM, leftArity, DEFAULT_INPUT2_TERM, rightArity)
+    } else {
+      (DEFAULT_INPUT2_TERM, rightArity, DEFAULT_INPUT1_TERM, leftArity)
+    }
+  }
+
+  def gen(): CodeGenOperatorFactory[BaseRow] = {
+    val config = ctx.tableConfig
+
+    val exprGenerator = new ExprCodeGenerator(ctx, joinType.isOuter)
+        .bindInput(leftType).bindSecondInput(rightType)
+
+    // we use ResettableExternalBuffer to prevent OOM
+    val buffer = newName("resettableExternalBuffer")
+    val iter = newName("iter")
+
+    // input row might not be binary row, need a serializer
+    val isFirstRow = newName("isFirstRow")
+    val isBinaryRow = newName("isBinaryRow")
+
+    val externalBufferMemorySize = config.getConf.getInteger(
+      TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+
+    if (singleRowJoin) {
+      ctx.addReusableMember(s"$BASE_ROW $buildRow = null;")
+    } else {
+      ctx.addReusableMember(s"boolean $isFirstRow = true;")
+      ctx.addReusableMember(s"boolean $isBinaryRow = false;")
+
+      val serializer = newName("serializer")
+      def initSerializer(i: Int): Unit = {
+        ctx.addReusableOpenStatement(
+          s"""
+             |${className[AbstractRowSerializer[_]]} $serializer =
+             |  (${className[AbstractRowSerializer[_]]}) getOperatorConfig()
+             |    .getTypeSerializerIn$i(getUserCodeClassloader());
+             |""".stripMargin)
+      }
+      if (leftIsBuild) initSerializer(1) else initSerializer(2)
+
+      addReusableResettableExternalBuffer(buffer, externalBufferMemorySize, serializer)
+      ctx.addReusableCloseStatement(s"$buffer.close();")
+
+      val iterTerm = classOf[ResettableExternalBuffer#BufferIterator].getCanonicalName
+      ctx.addReusableMember(s"$iterTerm $iter = null;")
+    }
+
+    val condExpr = exprGenerator.generateExpression(condition)
+
+    val buildRowSer = ctx.addReusableTypeSerializer(if (leftIsBuild) leftType else rightType)
+
+    val buildProcessCode = if (singleRowJoin) {
+      s"this.$buildRow = ($BASE_ROW) $buildRowSer.copy($buildRow);"
+    } else {
+      s"$buffer.add(($BASE_ROW) $buildRow);"
+    }
+
+    var (probeProcessCode, buildEndCode, probeEndCode) =
+      if (joinType == FlinkJoinType.SEMI || joinType == FlinkJoinType.ANTI) {
+        genSemiJoinProcessAndEndCode(condExpr, iter, buffer)
+      } else {
+        genJoinProcessAndEndCode(condExpr, iter, buffer)
+      }
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd = false;")
+    buildEndCode =
+        (if (singleRowJoin) buildEndCode else s"$buffer.complete(); \n $buildEndCode") +
+            s"\n $buildEnd = true;"
+
+    // build first or second
+    val (processCode1, endInputCode1, processCode2, endInputCode2)  =
+      if (leftIsBuild) {
+        (buildProcessCode, buildEndCode, probeProcessCode, probeEndCode)
+      } else {
+        (probeProcessCode, probeEndCode, buildProcessCode, buildEndCode)
+      }
+
+    // generator operatorExpression
+    val genOp = OperatorCodeGenerator.generateTwoInputStreamOperator[BaseRow, BaseRow, BaseRow](
+      ctx,
+      "BatchNestedLoopJoin",
+      processCode1,
+      endInputCode1,
+      processCode2,
+      endInputCode2,
+      s"""
+         |if ($buildEnd) {
+         |  return $INPUT_SELECTION.${if (leftIsBuild) "SECOND" else "FIRST"};
+         |} else {
+         |  return $INPUT_SELECTION.${if (leftIsBuild) "FIRST" else "SECOND"};
+         |}
+       """.stripMargin,
+      leftType,
+      rightType)
+    new CodeGenOperatorFactory[BaseRow](genOp)
+  }
+
+  /**
+    * Deal with inner join, left outer join, right outer join and full outer join.
+    */
+  private def genJoinProcessAndEndCode(
+      condExpr: GeneratedExpression, iter: String, buffer: String): (String, String, String) = {
+    val joinedRowTerm = newName("joinedRow")
+    def joinedRow(row1: String, row2: String): String = {
+      s"$joinedRowTerm.replace($row1, $row2)"
+    }
+
+    val buildMatched = newName("buildMatched")
+    val probeMatched = newName("probeMatched")
+    val buildNullRow = newName("buildNullRow")
+    val probeNullRow = newName("probeNullRow")
+
+    val isFull = joinType == FlinkJoinType.FULL
+    val probeOuter = joinType.isOuter
+
+    ctx.addReusableOutputRecord(outputType, classOf[JoinedRow], joinedRowTerm)
+    ctx.addReusableNullRow(buildNullRow, buildArity)
+
+    val bitSetTerm = classOf[util.BitSet].getCanonicalName
+    if (isFull) {
+      ctx.addReusableNullRow(probeNullRow, probeArity)
+      if (singleRowJoin) {
+        ctx.addReusableMember(s"boolean $buildMatched = false;")
+      } else {
+        // BitSet is slower than boolean[].
+        // We can use boolean[] when there are a small number of records.
+        ctx.addReusableMember(s"$bitSetTerm $buildMatched = null;")
+      }
+    }
+
+    val probeOuterCode =
+      s"""
+         |if (!$probeMatched) {
+         |  ${generateCollect(
+              if (leftIsBuild)
+                joinedRow(buildNullRow, probeRow)
+              else
+                joinedRow(probeRow, buildNullRow))}
+         |}
+            """.stripMargin
+
+    val iterCnt = newName("iteratorCount")
+    val joinBuildAndProbe = {
+      s"""
+         |${ctx.reusePerRecordCode()}
+         |${ctx.reuseInputUnboxingCode(buildRow)}
+         |${condExpr.code}
+         |if (${condExpr.resultTerm}) {
+         |  ${generateCollect(joinedRow(DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM))}
+         |
+         |  // set probe outer matched flag
+         |  ${if (probeOuter) s"$probeMatched = true;" else ""}
+         |
+         |  // set build outer matched flag
+         |  ${if (singleRowJoin) {
+                if (isFull) s"$buildMatched = true;" else ""
+              } else {
+                if (isFull) s"$buildMatched.set($iterCnt);" else ""
+              }
+            }
+         |}
+         |""".stripMargin
+    }
+
+    val goJoin = if (singleRowJoin) {
+      s"""
+         |if ($buildRow != null) {
+         |  $joinBuildAndProbe
+         |}
+       """.stripMargin
+    } else {
+      s"""
+         |${resetIterator(iter, buffer)}
+         |${if (isFull) s"int $iterCnt = -1;" else ""}
+         |while ($iter.advanceNext()) {
+         |  ${if (isFull) s"$iterCnt++;" else ""}
+         |  $BINARY_ROW $buildRow = $iter.getRow();
+         |  $joinBuildAndProbe
+         |}
+         |""".stripMargin
+    }
+
+    val processCode =
+      s"""
+         |${if (probeOuter) s"boolean $probeMatched = false;" else ""}
+         |${ctx.reuseInputUnboxingCode(probeRow)}
+         |$goJoin
+         |${if (probeOuter) probeOuterCode else ""}
+         |""".stripMargin
+
+    val buildEndCode =
+        s"""
+           |LOG.info("Finish build phase.");
+           |${
+              if (!singleRowJoin && isFull) {
+                s"$buildMatched = new $bitSetTerm($buffer.size());"
+              } else {
+                ""
+              }
+             }
+           |""".stripMargin
+
+    val buildOuterEmit = generateCollect(
+      if (leftIsBuild) joinedRow(buildRow, probeNullRow) else joinedRow(probeNullRow, buildRow))
+
+    var probeEndCode = if (isFull) {
+      if (singleRowJoin) {
+        s"""
+           |if ($buildRow != null && !$buildMatched) {
+           |  $buildOuterEmit
+           |}
+         """.stripMargin
+      } else {
+        val iterCnt = newName("iteratorCount")
+        s"""
+           |${resetIterator(iter, buffer)}
+           |int $iterCnt = -1;
+           |while ($iter.advanceNext()) {
+           |  $iterCnt++;
+           |  $BINARY_ROW $buildRow = $iter.getRow();
+           |  if (!$buildMatched.get($iterCnt)) {
+           |    $buildOuterEmit
+           |  }
+           |}
+           |""".stripMargin
+      }
+    } else {
+      ""
+    }
+
+    probeEndCode =
+        s"""
+           |$probeEndCode
+           |LOG.info("Finish probe phase.");
+       """.stripMargin
+
+    (processCode, buildEndCode, probeEndCode)
+  }
+
+  /**
+    * Deal with semi join and anti join.
+    */
+  private def genSemiJoinProcessAndEndCode(
+      condExpr: GeneratedExpression, iter: String, buffer: String): (String, String, String) = {
+    val probeMatched = newName("probeMatched")
+    val goJoin = if (singleRowJoin) {
+      s"""
+         |if ($buildRow != null) {
+         |  ${ctx.reusePerRecordCode()}
+         |  ${ctx.reuseInputUnboxingCode(buildRow)}
+         |  ${condExpr.code}
+         |  if (${condExpr.resultTerm}) {
+         |    $probeMatched = true;
+         |  }
+         |}
+         |""".stripMargin
+    } else {
+      s"""
+         |${resetIterator(iter, buffer)}
+         |while ($iter.advanceNext()) {
+         |  $BINARY_ROW $buildRow = $iter.getRow();
+         |  ${ctx.reusePerRecordCode()}
+         |  ${ctx.reuseInputUnboxingCode(buildRow)}
+         |  ${condExpr.code}
+         |  if (${condExpr.resultTerm}) {
+         |    $probeMatched = true;
+         |    break;
+         |  }
+         |}
+         |""".stripMargin
+    }
+
+    (s"""
+        |boolean $probeMatched = false;
+        |${ctx.reuseInputUnboxingCode(probeRow)}
+        |$goJoin
+        |if (${if (joinType == FlinkJoinType.ANTI) "!" else ""}$probeMatched) {
+        |  ${generateCollect(probeRow)}
+        |}
+        |""".stripMargin, "", "")
+  }
+
+  /**
+    * Reset or new a iterator.
+    */
+  def resetIterator(iter: String, buffer: String): String = {
+    s"""
+       |if ($iter == null) {
+       |  $iter = $buffer.newIterator();
+       |} else {
+       |  $iter.reset();
+       |}
+       |""".stripMargin
+  }
+
+  def addReusableResettableExternalBuffer(
+      fieldTerm: String, memSize: Long, serializer: String): Unit = {
+    val memManager = "getContainingTask().getEnvironment().getMemoryManager()"
+    val ioManager = "getContainingTask().getEnvironment().getIOManager()"
+
+    val open =
+      s"""
+         |$fieldTerm = new ${className[ResettableExternalBuffer]}(
+         |  $memManager,
+         |  $ioManager,
+         |  $memManager.allocatePages(
+         |    getContainingTask(), ((int) $memSize) / $memManager.getPageSize()),
+         |  $serializer,
+         |  false);
+         |""".stripMargin
+    ctx.addReusableMember(s"${className[ResettableExternalBuffer]} $fieldTerm = null;")
+    ctx.addReusableOpenStatement(open)
+  }
+}
+
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index f4b540c..8be5cba 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -18,20 +18,25 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
+import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGeneratorContext, NestedLoopJoinCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
+import org.apache.flink.table.plan.nodes.ExpressionFormat
 import org.apache.flink.table.plan.nodes.exec.ExecNode
-import org.apache.flink.table.typeutils.BinaryRowSerializer
+import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryRowSerializer}
+
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
-import java.util
 
-import org.apache.flink.api.dag.Transformation
+import java.util
 
 import scala.collection.JavaConversions._
 
@@ -124,7 +129,44 @@ class BatchExecNestedLoopJoin(
 
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): Transformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val lInput = getInputNodes.get(0).translateToPlan(tableEnv)
+        .asInstanceOf[Transformation[BaseRow]]
+    val rInput = getInputNodes.get(1).translateToPlan(tableEnv)
+        .asInstanceOf[Transformation[BaseRow]]
+
+    // get type
+    val lType = lInput.getOutputType.asInstanceOf[BaseRowTypeInfo].toRowType
+    val rType = rInput.getOutputType.asInstanceOf[BaseRowTypeInfo].toRowType
+    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val op = new NestedLoopJoinCodeGenerator(
+      CodeGeneratorContext(tableEnv.getConfig),
+      singleRowJoin,
+      leftIsBuild,
+      lType,
+      rType,
+      outputType,
+      flinkJoinType,
+      condition
+    ).gen()
+
+    new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+      lInput,
+      rInput,
+      getOperatorName,
+      op,
+      BaseRowTypeInfo.of(outputType),
+      getResource.getParallelism)
+  }
+
+  private def getOperatorName: String = {
+    val joinExpressionStr = if (getCondition != null) {
+      val inFields = inputRowType.getFieldNames.toList
+      s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
+    } else {
+      ""
+    }
+    s"NestedLoopJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
index def6c29..bae54c1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData.numericType
@@ -29,7 +28,6 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import scala.collection.Seq
 
-
 class Limit0RemoveITCase extends BatchTestBase {
 
   @Before
@@ -75,17 +73,15 @@ class Limit0RemoveITCase extends BatchTestBase {
     checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
   }
 
-  @Test(expected = classOf[TableException])
-  // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+  @Test
   def testLimitRemoveWithExists(): Unit = {
     val sqlQuery = "SELECT * FROM t1 WHERE EXISTS (SELECT a FROM t2 LIMIT 0)"
     checkResult(sqlQuery, Seq())
   }
 
-  @Test(expected = classOf[TableException])
-  // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+  @Test
   def testLimitRemoveWithNotExists(): Unit = {
-    val sqlQuery = "SELECT * FROM t1 WHERE NOT EXISTS (SELECT a FROM t2 LIMIT 0)"
+    val sqlQuery = "SELECT a FROM t1 WHERE NOT EXISTS (SELECT a FROM t2 LIMIT 0)"
     checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
index 0b78c95..fd430d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.batch.sql
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableConfigOptions, TableException}
+import org.apache.flink.table.api.TableConfigOptions
 import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper
 import org.apache.flink.table.runtime.batch.sql.join.JoinType.SortMergeJoin
 import org.apache.flink.table.runtime.utils.BatchTestBase
@@ -511,7 +511,8 @@ class MiscITCase extends BatchTestBase {
     )
   }
 
-  @Test(expected = classOf[TableException])
+  @Ignore // TODO support lazy from source
+  @Test
   def testCompareFunctionWithSubquery(): Unit = {
     checkResult("SELECT " +
         "b IN (3, 4, 5)," +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateRemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateRemoveITCase.scala
index 79ffc57..a3f3b60 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateRemoveITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateRemoveITCase.scala
@@ -79,12 +79,10 @@ class AggregateRemoveITCase extends BatchTestBase {
       Seq(row(1, 2, 3))
     )
 
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a) FROM T3 WHERE 1=2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
+    checkResult(
+      "SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a) FROM T3 WHERE 1=2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
 
     checkResult(
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
index 9efecbc..3815d6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
@@ -66,19 +66,15 @@ class PruneAggregateCallITCase extends BatchTestBase {
       Seq(row(1))
     )
 
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*) FROM MyTable2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
-
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*) FROM MyTable2 WHERE 1=2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*) FROM MyTable2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
+
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*) FROM MyTable2 WHERE 1=2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
 
     checkResult(
       "SELECT 1 FROM (SELECT SUM(a), COUNT(*) FROM MyTable) t",
@@ -100,33 +96,25 @@ class PruneAggregateCallITCase extends BatchTestBase {
       Seq(row(1))
     )
 
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT SUM(a), COUNT(*) FROM MyTable2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
-
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*), SUM(a) FROM MyTable2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
-
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT SUM(a), COUNT(*) FROM MyTable2 WHERE 1=2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
-
-    // TODO enable this case after translateToPlanInternal method is implemented
-    //  in BatchExecNestedLoopJoin
-    // checkResult(
-    //   "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*), SUM(a) FROM MyTable2 WHERE 1=2)",
-    //   Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
-    // )
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT SUM(a), COUNT(*) FROM MyTable2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
+
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*), SUM(a) FROM MyTable2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
+
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT SUM(a), COUNT(*) FROM MyTable2 WHERE 1=2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
+
+    checkResult(
+      "SELECT * FROM MyTable WHERE EXISTS (SELECT COUNT(*), SUM(a) FROM MyTable2 WHERE 1=2)",
+      Seq(row(1, 1, "Hi"), row(2, 2, "Hello"), row(3, 2, "Hello world"))
+    )
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
index cfc1828..14da21c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -22,23 +22,24 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableConfigOptions
 import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper.disableOtherJoinOpForJoin
-import org.apache.flink.table.runtime.batch.sql.join.JoinType.{JoinType, NestedLoopJoin, SortMergeJoin}
+import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData._
 import org.apache.flink.table.typeutils.BigDecimalTypeInfo
 
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
 import java.math.{BigDecimal => JBigDecimal}
+import java.util
 
 import scala.collection.Seq
 import scala.util.Random
 
-// @RunWith(classOf[Parameterized]) TODO
-class InnerJoinITCase extends BatchTestBase {
-
-  val expectedJoinType: JoinType = JoinType.SortMergeJoin
+@RunWith(classOf[Parameterized])
+class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
 
   private lazy val myUpperCaseData = Seq(
     row(1, "A"),
@@ -154,8 +155,7 @@ class InnerJoinITCase extends BatchTestBase {
   def testBigForSpill(): Unit = {
 
     conf.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM, 1)
-    //TODO ensure hash join spilled
-//    conf.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM, 2)
+    conf.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM, 2)
     tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
 
     val bigData = Random.shuffle(
@@ -188,3 +188,12 @@ class InnerJoinITCase extends BatchTestBase {
     }
   }
 }
+
+object InnerJoinITCase {
+  @Parameterized.Parameters(name = "{0}")
+  def parameters(): util.Collection[Array[_]] = {
+    util.Arrays.asList(
+      Array(BroadcastHashJoin), Array(HashJoin), Array(SortMergeJoin), Array(NestedLoopJoin))
+  }
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
index b3f29f6..8a7d59f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
@@ -421,6 +421,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     }
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testFullOuterJoinWithoutEqualCond(): Unit = {
     if (expectedJoinType == NestedLoopJoin) {
@@ -433,6 +434,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     }
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testSingleRowFullOuterJoinWithoutEqualCond(): Unit = {
     if (expectedJoinType == NestedLoopJoin) {
@@ -445,6 +447,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     }
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testSingleRowFullOuterJoinWithoutEqualCondNoMatch(): Unit = {
     if (expectedJoinType == NestedLoopJoin) {
@@ -601,6 +604,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
       row(2, 1.0) :: row(2, 1.0) :: Nil)
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testJoinWithNull(): Unit = {
     // TODO enable all
@@ -642,6 +646,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     }
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testSingleRowJoin(): Unit = {
     if (expectedJoinType == NestedLoopJoin) {
@@ -683,6 +688,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     }
   }
 
+  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testNonEmptyTableJoinEmptyTable(): Unit = {
     if (expectedJoinType == NestedLoopJoin) {
@@ -813,7 +819,8 @@ object JoinITCase {
     util.Arrays.asList(
       Array(BroadcastHashJoin),
       Array(HashJoin),
-      Array(SortMergeJoin))
+      Array(SortMergeJoin),
+      Array(NestedLoopJoin))
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinWithoutKeyITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
new file mode 100644
index 0000000..415863f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql.join
+
+import org.apache.flink.table.api.{PlannerConfigOptions, TableConfigOptions}
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+
+import org.junit.{Before, Ignore, Test}
+
+import scala.collection.Seq
+
+class JoinWithoutKeyITCase extends BatchTestBase {
+
+  @Before
+  def before(): Unit = {
+    tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 3)
+    registerCollection("SmallTable3", smallData3, type3, "a, b, c", nullablesOfSmallData3)
+    registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+    registerCollection("Table5", data5, type5, "d, e, f, g, h", nullablesOfData5)
+    registerCollection("NullTable3", nullData3, type3, "a, b, c", nullablesOfNullData3)
+    registerCollection("NullTable5", nullData5, type5, "d, e, f, g, h", nullablesOfNullData5)
+    registerCollection("l", data2_3, INT_DOUBLE, "a, b", nullablesOfData2_3)
+    registerCollection("r", data2_2, INT_DOUBLE, "c, d")
+
+    registerCollection("testData", intStringData, INT_STRING, "a, b", nullablesOfIntStringData)
+    registerCollection("testData2", intIntData2, INT_INT, "c, d", nullablesOfIntIntData2)
+    registerCollection("testData3", intIntData3, INT_INT, "e, f", nullablesOfIntIntData3)
+    registerCollection("leftT", SemiJoinITCase.leftT, INT_DOUBLE, "a, b")
+    registerCollection("rightT", SemiJoinITCase.rightT, INT_DOUBLE, "c, d")
+  }
+
+  // single row join
+
+  @Ignore // TODO not support same source until set lazy_from_source
+  @Test
+  def testCrossJoinWithLeftSingleRowInput(): Unit = {
+    checkResult(
+      "SELECT * FROM (SELECT count(*) FROM SmallTable3) CROSS JOIN SmallTable3",
+      Seq(
+        row(3, 1, 1, "Hi"),
+        row(3, 2, 2, "Hello"),
+        row(3, 3, 2, "Hello world")
+      ))
+  }
+
+  @Ignore // TODO not support same source until set lazy_from_source
+  @Test
+  def testCrossJoinWithRightSingleRowInput(): Unit = {
+    checkResult(
+      "SELECT * FROM SmallTable3 CROSS JOIN (SELECT count(*) FROM SmallTable3)",
+      Seq(
+        row(1, 1, "Hi", 3),
+        row(2, 2, "Hello", 3),
+        row(3, 2, "Hello world", 3)
+      ))
+  }
+
+  @Ignore // TODO not support same source until set lazy_from_source
+  @Test
+  def testCrossJoinWithEmptySingleRowInput(): Unit = {
+    checkResult(
+      "SELECT * FROM SmallTable3 CROSS JOIN (SELECT count(*) FROM SmallTable3 HAVING count(*) < 0)",
+      Seq())
+  }
+
+  @Test
+  def testLeftNullRightJoin(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM SmallTable3) " +
+          "WHERE cnt < 0) RIGHT JOIN Table5 ON d < cnt",
+      Seq(
+        row(1, null),
+        row(2, null), row(2, null),
+        row(3, null), row(3, null), row(3, null),
+        row(4, null), row(4, null), row(4, null), row(4, null),
+        row(5, null), row(5, null), row(5, null), row(5, null), row(5, null)
+      ))
+  }
+
+  @Test
+  def testLeftSingleRightJoinEqualPredicate(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM (SELECT COUNT(*) AS cnt FROM SmallTable3) RIGHT JOIN Table5 ON cnt = d",
+      Seq(
+        row(1, null),
+        row(2, null), row(2, null),
+        row(3, 3), row(3, 3), row(3, 3),
+        row(4, null), row(4, null), row(4, null), row(4, null),
+        row(5, null), row(5, null), row(5, null), row(5, null), row(5, null)
+      ))
+  }
+
+  @Test
+  def testSingleJoinWithReusePerRecordCode(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM (SELECT COUNT(*) AS cnt FROM SmallTable3) " +
+          "RIGHT JOIN Table5 ON d = UNIX_TIMESTAMP(cast(CURRENT_TIMESTAMP as VARCHAR))",
+      Seq(
+        row(1, null),
+        row(2, null), row(2, null),
+        row(3, null), row(3, null), row(3, null),
+        row(4, null), row(4, null), row(4, null), row(4, null),
+        row(5, null), row(5, null), row(5, null), row(5, null), row(5, null)
+      ))
+  }
+
+  @Test
+  def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM (SELECT COUNT(*) AS cnt FROM SmallTable3) RIGHT JOIN Table5 ON cnt > d",
+      Seq(
+        row(1, 3),
+        row(2, 3), row(2, 3),
+        row(3, null), row(3, null), row(3, null),
+        row(4, null), row(4, null), row(4, null), row(4, null),
+        row(5, null), row(5, null), row(5, null), row(5, null), row(5, null)
+      ))
+  }
+
+  @Test
+  def testRightNullLeftJoin(): Unit = {
+    checkResult(
+      "SELECT a, cnt FROM SmallTable3 LEFT JOIN (SELECT cnt FROM " +
+          "(SELECT COUNT(*) AS cnt FROM Table5) WHERE cnt < 0) ON cnt > a",
+      Seq(
+        row(1, null), row(2, null), row(3, null)
+      ))
+  }
+
+  @Test
+  def testRightSingleLeftJoinEqualPredicate(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM Table5 LEFT JOIN (SELECT COUNT(*) AS cnt FROM SmallTable3) ON cnt = d",
+      Seq(
+        row(1, null),
+        row(2, null), row(2, null),
+        row(3, 3), row(3, 3), row(3, 3),
+        row(4, null), row(4, null), row(4, null), row(4, null),
+        row(5, null), row(5, null), row(5, null), row(5, null), row(5, null)
+      ))
+  }
+
+  @Test
+  def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+    checkResult(
+      "SELECT d, cnt FROM Table5 LEFT JOIN (SELECT COUNT(*) AS cnt FROM SmallTable3) ON cnt < d",
+      Seq(
+        row(1, null),
+        row(2, null), row(2, null),
+        row(3, null), row(3, null), row(3, null),
+        row(4, 3), row(4, 3), row(4, 3), row(4, 3),
+        row(5, 3), row(5, 3), row(5, 3), row(5, 3), row(5, 3)
+      ))
+  }
+
+  @Test
+  def testRightSingleLeftJoinTwoFields(): Unit = {
+    checkResult(
+      "SELECT d, cnt, cnt2 FROM Table5 LEFT JOIN " +
+          "(SELECT COUNT(*) AS cnt,COUNT(*) AS cnt2 FROM SmallTable3 ) AS x ON d = cnt",
+      Seq(
+        row(1, null, null),
+        row(2, null, null), row(2, null, null),
+        row(3, 3, 3), row(3, 3, 3), row(3, 3, 3),
+        row(4, null, null), row(4, null, null), row(4, null, null), row(4, null, null),
+        row(5, null, null), row(5, null, null), row(5, null, null), row(5, null, null),
+        row(5, null, null)
+      ))
+  }
+
+  // inner/cross/outer join
+
+  @Test
+  def testCrossJoin(): Unit = {
+    checkResult(
+      "SELECT c, g FROM NullTable3 CROSS JOIN NullTable5 where a = 3 and e > 13",
+      Seq(
+        row("Hello world", "JKL"),
+        row("Hello world", "KLM"),
+        row("Hello world", "NullTuple"),
+        row("Hello world", "NullTuple")))
+  }
+
+  @Test
+  def testInnerJoin(): Unit = {
+    checkResult(
+      "SELECT c, g FROM NullTable3, NullTable5 WHERE b > e AND b < 3",
+      Seq(row("Hello", "Hallo"), row("Hello world", "Hallo")))
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+    checkResult(
+      "SELECT c, g FROM NullTable3 LEFT JOIN NullTable5 ON a > d + 10 where a = 3 OR a = 12",
+      Seq(row("Comment#6", "Hallo"), row("Hello world", "null")))
+  }
+
+  @Test
+  def testRightJoin(): Unit = {
+    checkResult(
+      "SELECT c, g FROM NullTable5 RIGHT JOIN NullTable3 ON a > d + 10 where a = 3 OR a = 12",
+      Seq(row("Comment#6", "Hallo"), row("Hello world", "null")))
+  }
+
+  @Test
+  def testFullJoin(): Unit = {
+    checkResult(
+      "SELECT * FROM (SELECT c, g FROM NullTable5 FULL JOIN NullTable3 ON a > d) " +
+          "WHERE c is null or g is null",
+      Seq(
+        row("Hi", null), row("NullTuple", null), row("NullTuple", null),
+        row(null, "NullTuple"), row(null, "NullTuple")
+      ))
+  }
+
+  @Test
+  def testUncorrelatedExist(): Unit = {
+    checkResult(
+      "select * from l where exists (select * from r where c > 0)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0))
+    )
+
+    checkResult(
+      "select * from l where exists (select * from r where c < 0)",
+      Seq()
+    )
+  }
+
+  @Test
+  def testUncorrelatedNotExist(): Unit = {
+    checkResult(
+      "select * from l where not exists (select * from r where c < 0)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0))
+    )
+
+    checkResult(
+      "select * from l where not exists (select * from r where c > 0)",
+      Seq()
+    )
+  }
+
+  @Test
+  def testCartesianJoin(): Unit = {
+    val data = Seq(row(1, null), row(2, 2))
+    registerCollection("T1", data, INT_INT, "a, b")
+    registerCollection("T2", data, INT_INT, "c, d")
+
+    checkResult(
+      "select * from T1 JOIN T2 ON true",
+      Seq(row(1, null, 1, null), row(1, null, 2, 2), row(2, 2, 1, null), row(2, 2, 2, 2))
+    )
+
+    checkResult(
+      "select * from T1 JOIN T2 ON a > c",
+      Seq(row(2, 2, 1, null))
+    )
+
+  }
+
+  @Test
+  def testInner(): Unit = {
+    checkResult(
+      """
+          SELECT b, c, d FROM testData, testData2 WHERE a = 2
+        """.stripMargin,
+      row("2", 1, 1) ::
+          row("2", 1, 2) ::
+          row("2", 2, 1) ::
+          row("2", 2, 2) ::
+          row("2", 3, 1) ::
+          row("2", 3, 2) :: Nil)
+
+    checkResult(
+      """
+          SELECT b, c, d FROM testData, testData2 WHERE a < c
+        """.stripMargin,
+      row("1", 2, 1) ::
+          row("1", 2, 2) ::
+          row("1", 3, 1) ::
+          row("1", 3, 2) ::
+          row("2", 3, 1) ::
+          row("2", 3, 2) :: Nil)
+
+    checkResult(
+      """
+          SELECT b, c, d FROM testData JOIN testData2 ON a < c
+        """.stripMargin,
+      row("1", 2, 1) ::
+          row("1", 2, 2) ::
+          row("1", 3, 1) ::
+          row("1", 3, 2) ::
+          row("2", 3, 1) ::
+          row("2", 3, 2) :: Nil)
+  }
+
+  @Test
+  def testInnerExpr(): Unit = {
+    checkResult(
+      "SELECT * FROM testData2, testData3 WHERE c - e = 0",
+      Seq(
+        row(1, 1, 1, null),
+        row(1, 2, 1, null),
+        row(2, 1, 2, 2),
+        row(2, 2, 2, 2)
+      ))
+
+    checkResult(
+      "SELECT * FROM testData2, testData3 WHERE c - e = 1",
+      Seq(
+        row(2, 1, 1, null),
+        row(2, 2, 1, null),
+        row(3, 1, 2, 2),
+        row(3, 2, 2, 2)
+      ))
+  }
+
+  @Test
+  def testNonKeySemi(): Unit = {
+    checkResult(
+      "SELECT * FROM testData3 WHERE EXISTS (SELECT * FROM testData2)",
+      row(1, null) :: row(2, 2) :: Nil)
+    checkResult(
+      "SELECT * FROM testData3 WHERE NOT EXISTS (SELECT * FROM testData2)",
+      Nil)
+    checkResult(
+      """
+        |SELECT e FROM testData3
+        |WHERE
+        |  EXISTS (SELECT * FROM testData)
+        |OR
+        |  EXISTS (SELECT * FROM testData2)""".stripMargin,
+      row(1) :: row(2) :: Nil)
+    checkResult(
+      """
+        |SELECT a FROM testData
+        |WHERE
+        |  a IN (SELECT c FROM testData2)
+        |OR
+        |  a IN (SELECT e FROM testData3)""".stripMargin,
+      row(1) :: row(2) :: row(3) :: Nil)
+  }
+
+  @Test
+  def testComposedNonEqualConditionLeftAnti(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a < c AND b < d)",
+      Seq(row(3, 3.0), row(6, null), row(null, 5.0), row(null, null)))
+  }
+
+  @Test
+  def testComposedNonEqualConditionLeftSemi(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a < c AND b < d)",
+      Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0)))
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/OuterJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/OuterJoinITCase.scala
index 6c0debf..aae01fa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/OuterJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/OuterJoinITCase.scala
@@ -19,19 +19,21 @@
 package org.apache.flink.table.runtime.batch.sql.join
 
 import org.apache.flink.table.api.TableConfigOptions
-import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, JoinType, NestedLoopJoin}
+import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData._
 
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
-import scala.collection.Seq
+import java.util
 
-//@RunWith(classOf[Parameterized]) TODO
-class OuterJoinITCase extends BatchTestBase {
+import scala.collection.Seq
 
-  val expectedJoinType: JoinType = JoinType.SortMergeJoin
+@RunWith(classOf[Parameterized])
+class OuterJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
 
   private lazy val leftT = Seq(
     row(1, 2.0),
@@ -383,3 +385,11 @@ class OuterJoinITCase extends BatchTestBase {
     }
   }
 }
+
+object OuterJoinITCase {
+  @Parameterized.Parameters(name = "{0}")
+  def parameters(): util.Collection[Array[_]] = {
+    util.Arrays.asList(
+      Array(BroadcastHashJoin), Array(HashJoin), Array(SortMergeJoin), Array(NestedLoopJoin))
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/ScalarQueryITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/ScalarQueryITCase.scala
new file mode 100644
index 0000000..a56f42f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/ScalarQueryITCase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql.join
+
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+
+import org.junit.{Before, Test}
+
+import scala.collection.Seq
+
+class ScalarQueryITCase extends BatchTestBase {
+
+  lazy val l = Seq(
+    row(1, 2.0),
+    row(1, 2.0),
+    row(2, 1.0),
+    row(2, 1.0),
+    row(3, 3.0),
+    row(null, null),
+    row(null, 5.0),
+    row(6, null)
+  )
+
+  lazy val r = Seq(
+    row(2, 3.0),
+    row(2, 3.0),
+    row(3, 2.0),
+    row(4, 1.0),
+    row(null, null),
+    row(null, 5.0),
+    row(6, null)
+  )
+
+
+
+}
+
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/SemiJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/SemiJoinITCase.scala
new file mode 100644
index 0000000..904c620
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/SemiJoinITCase.scala
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql.join
+
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
+import org.apache.flink.table.runtime.batch.sql.join.SemiJoinITCase.leftT
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, Ignore, Test}
+
+import java.util
+
+import scala.collection.Seq
+
+@RunWith(classOf[Parameterized])
+class SemiJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
+
+  @Before
+  def before(): Unit = {
+    tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 3)
+    registerCollection("leftT", leftT, INT_DOUBLE, "a, b")
+    registerCollection("rightT", SemiJoinITCase.rightT, INT_DOUBLE, "c, d")
+    registerCollection("rightUniqueKeyT", SemiJoinITCase.rightUniqueKeyT, INT_DOUBLE, "c, d")
+    JoinITCaseHelper.disableOtherJoinOpForJoin(tEnv, expectedJoinType)
+  }
+
+  @Test
+  def testSingleConditionLeftSemi(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)",
+      Seq(row(2, 1.0), row(2, 1.0), row(3, 3.0), row(6, null)))
+  }
+
+  @Test
+  def testComposedConditionLeftSemi(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b < d)",
+      Seq(row(2, 1.0), row(2, 1.0)))
+  }
+
+  @Test
+  def testSingleConditionLeftAnti(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c)",
+      Seq(row(1, 2.0), row(1, 2.0), row(null, null), row(null, 5.0)))
+  }
+
+  @Test
+  def testSingleUniqueConditionLeftAnti(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS " +
+          "(SELECT * FROM (SELECT DISTINCT c FROM rightT) WHERE a = c)",
+      Seq(row(1, 2.0), row(1, 2.0), row(null, null), row(null, 5.0)))
+  }
+
+  @Test
+  def testComposedConditionLeftAnti(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b < d)",
+      Seq(row(1, 2.0), row(1, 2.0), row(3, 3.0), row(6, null), row(null, 5.0), row(null, null)))
+  }
+
+  @Test
+  def testComposedUniqueConditionLeftAnti(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightUniqueKeyT WHERE a = c AND b < d)",
+      Seq(row(1, 2.0), row(1, 2.0), row(3, 3.0), row(null, null), row(null, 5.0), row(6, null)))
+  }
+
+  @Test
+  def testSemiJoinTranspose(): Unit = {
+    checkResult("SELECT a, b FROM " +
+        "(SELECT a, b, c FROM leftT, rightT WHERE a = c) lr " +
+        "WHERE lr.a > 0 AND lr.c IN (SELECT c FROM rightUniqueKeyT WHERE d > 1)",
+      Seq(row(2, 1.0), row(2, 1.0), row(2, 1.0), row(2, 1.0), row(3, 3.0))
+    )
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi1(): Unit = {
+    checkResult(
+      "SELECT * FROM (SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)) T WHERE T.b > 2",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi2(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM (SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT)) T WHERE T.b > 2",
+        Seq(row(3, 3.0), row(null, 5.0)))
+    }
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi3(): Unit = {
+    checkResult(
+      "SELECT * FROM " +
+          "(SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c)) T " +
+          "WHERE T.b > 2",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi1(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b > 2)",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi2(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE b > 2)",
+        Seq(row(3, 3.0), row(null, 5.0)))
+    }
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi3(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti1(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM " +
+            "(SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE c < 3)) T " +
+            "WHERE T.b > 2",
+        Seq(row(3, 3.0)))
+    }
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti2(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM " +
+            "(SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT where c > 10)) T " +
+            "WHERE T.b > 2",
+        Seq(row(3, 3.0), row(null, 5.0)))
+    }
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti3(): Unit = {
+    checkResult(
+      "SELECT * FROM " +
+          "(SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND c < 3)) T " +
+          "WHERE T.b > 2",
+      Seq(row(3, 3.0), row(null, 5.0)))
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti4(): Unit = {
+    checkResult(
+      "SELECT * FROM " +
+          "(SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c)) T " +
+          "WHERE T.b > 2",
+      Seq(row(null, 5.0)))
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti1(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b > 2)",
+        Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0), row(null, null), row(6, null)))
+    }
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti2(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE b > 2)",
+        Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0), row(null, null), row(6, null)))
+    }
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti3(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND b > 1)",
+      Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0),
+        row(3, 3.0), row(null, null), row(6, null)))
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti4(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)",
+      Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0),
+        row(null, null), row(null, 5.0), row(6, null)))
+  }
+
+  @Test
+  def testInWithAggregate1(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT SUM(a) FROM leftT WHERE b = d)",
+      Seq(row(4, 1.0))
+    )
+  }
+
+  @Ignore // TODO not support same source until set lazy_from_source
+  @Test
+  def testInWithAggregate2(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT t1 WHERE a IN (SELECT DISTINCT a FROM leftT t2 WHERE t1.b = t2.b)",
+      Seq(row(1, 2.0), row(1, 2.0), row(2, 1.0), row(2, 1.0), row(3, 3.0))
+    )
+  }
+
+  @Test
+  def testInWithAggregate3(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE CAST(c/2 AS BIGINT) IN (SELECT COUNT(*) FROM leftT WHERE b = d)",
+      Seq(row(2, 3.0), row(2, 3.0), row(4, 1.0))
+    )
+  }
+
+  @Test
+  def testInWithOver1(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT SUM(a) OVER " +
+          "(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+          "FROM leftT)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0), row(6, null))
+    )
+  }
+
+  @Test
+  def testInWithOver2(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT SUM(a) OVER" +
+          "(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+          "FROM leftT GROUP BY a, b)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(6, null))
+    )
+  }
+
+  @Test
+  def testInWithOver3(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT SUM(a) OVER " +
+          "(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+          "FROM leftT WHERE b = d)",
+      Seq(row(4, 1.0))
+    )
+  }
+
+  @Test
+  def testInWithOver4(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT SUM(a) OVER" +
+          "(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+          "FROM leftT WHERE b = d GROUP BY a, b)",
+      Seq()
+    )
+  }
+
+  @Test
+  def testExistsWithOver1(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE EXISTS (SELECT SUM(a) OVER() FROM leftT WHERE b = d)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0), row(null, 5.0))
+    )
+  }
+
+  @Test
+  def testExistsWithOver2(): Unit = {
+    if (expectedJoinType eq NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM rightT WHERE EXISTS (SELECT SUM(a) OVER() FROM leftT WHERE b > d)",
+        Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0))
+      )
+    }
+  }
+
+  @Test
+  def testExistsWithOver3(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE EXISTS (SELECT SUM(a) OVER() FROM leftT WHERE b = d GROUP BY a)",
+      Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0), row(null, 5.0))
+    )
+  }
+
+  @Test
+  def testExistsWithOver4(): Unit = {
+    if (expectedJoinType eq NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM rightT WHERE EXISTS (SELECT SUM(a) OVER() FROM leftT WHERE b>d GROUP BY a)",
+        Seq(row(2, 3.0), row(2, 3.0), row(3, 2.0), row(4, 1.0))
+      )
+    }
+  }
+
+  @Test
+  def testInWithNonEqualityCorrelationCondition1(): Unit = {
+    checkResult(
+      "SELECT * FROM rightT WHERE c IN (SELECT a FROM leftT WHERE b > d)",
+      Seq(row(3, 2.0))
+    )
+  }
+
+  @Test
+  def testInWithNonEqualityCorrelationCondition2(): Unit = {
+    checkResult(
+      "SELECT * FROM leftT WHERE a IN " +
+          "(SELECT c FROM (SELECT MAX(c) AS c, d FROM rightT GROUP BY d) r WHERE leftT.b > r.d)",
+      Seq(row(3, 3.0))
+    )
+  }
+
+  @Test
+  def testInWithNonEqualityCorrelationCondition3(): Unit = {
+    if (expectedJoinType eq NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE a IN " +
+            "(SELECT c FROM (SELECT MIN(c) OVER() AS c, d FROM rightT) r WHERE leftT.b <> r.d)",
+        Seq(row(2, 1.0), row(2, 1.0))
+      )
+    }
+  }
+
+  @Test
+  def testInWithNonEqualityCorrelationCondition4(): Unit = {
+    if (expectedJoinType eq NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE a IN (SELECT c FROM " +
+            "(SELECT MIN(c) OVER() AS c, d FROM rightT GROUP BY c, d) r WHERE leftT.b <> r.d)",
+        Seq(row(2, 1.0), row(2, 1.0))
+      )
+    }
+  }
+
+  @Test
+  def testExistsWithNonEqualityCorrelationCondition(): Unit = {
+    if (expectedJoinType eq JoinType.NestedLoopJoin) {
+      checkResult(
+        "SELECT * FROM leftT WHERE EXISTS (SELECT c FROM rightT WHERE b > d)",
+        Seq(row(1, 2.0), row(1, 2.0), row(3, 3.0), row(null, 5.0))
+      )
+    }
+  }
+
+  @Test
+  def testRewriteScalarQueryWithoutCorrelation1(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT)"
+    ).foreach(checkResult(_, leftT))
+  }
+
+  @Test
+  def testRewriteScalarQueryWithoutCorrelation2(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 5) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 5) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 5) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 5) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT WHERE c > 5)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT WHERE c > 5)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT WHERE c > 5)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT WHERE c > 5)"
+    ).foreach(checkResult(_, leftT))
+  }
+
+  @Test
+  def testRewriteScalarQueryWithoutCorrelation3(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 15) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 15) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 15) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE c > 15) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT WHERE c > 15)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT WHERE c > 15)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT WHERE c > 15)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT WHERE c > 15)"
+    ).foreach(checkResult(_, Seq.empty))
+  }
+
+  @Test
+  def testRewriteScalarQueryWithCorrelation1(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT WHERE a = c)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT WHERE a = c)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT WHERE a = c)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT WHERE a = c)"
+    ).foreach(checkResult(_, Seq(row(2, 1.0), row(2, 1.0), row(3, 3.0), row(6, null))))
+  }
+
+  @Test
+  def testRewriteScalarQueryWithCorrelation2(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 5)"
+    ).foreach(checkResult(_, Seq(row(6, null))))
+  }
+
+  @Test
+  def testRewriteScalarQueryWithCorrelation3(): Unit = {
+    Seq(
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15) > 0",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15) > 0.9",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15) >= 1",
+      "SELECT * FROM leftT WHERE (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15) >= 0.1",
+      "SELECT * FROM leftT WHERE 0 < (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15)",
+      "SELECT * FROM leftT WHERE 0.99 < (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15)",
+      "SELECT * FROM leftT WHERE 1 <= (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15)",
+      "SELECT * FROM leftT WHERE 0.01 <= (SELECT COUNT(*) FROM rightT WHERE a = c AND c > 15)"
+    ).foreach(checkResult(_, Seq.empty))
+  }
+}
+
+object SemiJoinITCase {
+  @Parameterized.Parameters(name = "{0}-{1}")
+  def parameters(): util.Collection[Any] = {
+    util.Arrays.asList(BroadcastHashJoin, HashJoin, SortMergeJoin, NestedLoopJoin)
+  }
+
+  lazy val leftT = Seq(
+    row(1, 2.0),
+    row(1, 2.0),
+    row(2, 1.0),
+    row(2, 1.0),
+    row(3, 3.0),
+    row(null, null),
+    row(null, 5.0),
+    row(6, null)
+  )
+
+  lazy val rightT = Seq(
+    row(2, 3.0),
+    row(2, 3.0),
+    row(3, 2.0),
+    row(4, 1.0),
+    row(null, null),
+    row(null, 5.0),
+    row(6, null)
+  )
+
+  lazy val rightUniqueKeyT = Seq(
+    row(2, 3.0),
+    row(3, 2.0),
+    row(4, 1.0),
+    row(null, 5.0),
+    row(6, null)
+  )
+}