You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/18 06:46:17 UTC

[flink] 03/04: [FLINK-26688][table-planner] Remove usages of TableConfig.nullCheck

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2eacc7373aa0fca2f2c59a72b0affb0ebda49ddf
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Wed Mar 16 12:52:46 2022 +0200

    [FLINK-26688][table-planner] Remove usages of TableConfig.nullCheck
    
    Remove usages of `TableConfig`s `nullCheck` which is used in code
    generation, since it was not actually used so far the option itself
    has been deprecated and planned to be removed in next releases.
    
    With this change, we can replace the necessicity top pass around
    `TableConfig` instead of `ReadableConfig` in various places.
---
 .../planner/plan/nodes/exec/ExecNodeConfig.java    |   2 -
 .../flink/table/planner/codegen/CodeGenUtils.scala |  49 ++++-----
 .../planner/codegen/CodeGeneratorContext.scala     |  19 +---
 .../table/planner/codegen/ExprCodeGenerator.scala  |  48 ++++-----
 .../table/planner/codegen/GenerateUtils.scala      |  42 ++------
 .../table/planner/codegen/MatchCodeGenerator.scala |  11 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   5 +-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 118 +++++----------------
 .../planner/codegen/calls/SearchOperatorGen.scala  |  21 +---
 .../planner/runtime/batch/table/JoinITCase.scala   |   2 -
 10 files changed, 82 insertions(+), 235 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
index 183ce14..2d238e1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
@@ -56,8 +56,6 @@ public final class ExecNodeConfig implements ReadableConfig {
         this.nodeConfig = nodeConfig;
         this.originalTableConfig = tableConfig;
         this.tableConfig = TableConfig.getDefault();
-        this.tableConfig.setNullCheck(tableConfig.getNullCheck());
-        this.tableConfig.setDecimalContext(tableConfig.getDecimalContext());
         this.tableConfig.addConfiguration(tableConfig.getConfiguration());
         this.tableConfig.addConfiguration((Configuration) nodeConfig);
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 683b231..21ead75 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -491,40 +491,27 @@ object CodeGenUtils {
         case Some(writer) =>
           // use writer to set field
           val writeField = binaryWriterWriteField(ctx, indexTerm, fieldTerm, writer, fieldType)
-          if (ctx.nullCheck) {
-            s"""
-               |${fieldExpr.code}
-               |if (${fieldExpr.nullTerm}) {
-               |  ${binaryWriterWriteNull(indexTerm, writer, fieldType)};
-               |} else {
-               |  $writeField;
-               |}
-             """.stripMargin
-          } else {
-            s"""
-               |${fieldExpr.code}
-               |$writeField;
-             """.stripMargin
-          }
+          s"""
+             |${fieldExpr.code}
+             |if (${fieldExpr.nullTerm}) {
+             |  ${binaryWriterWriteNull(indexTerm, writer, fieldType)};
+             |} else {
+             |  $writeField;
+             |}
+           """.stripMargin
 
         case None =>
           // directly set field to BinaryRowData, this depends on all the fields are fixed length
           val writeField = binaryRowFieldSetAccess(indexTerm, rowTerm, fieldType, fieldTerm)
-          if (ctx.nullCheck) {
-            s"""
-               |${fieldExpr.code}
-               |if (${fieldExpr.nullTerm}) {
-               |  ${binaryRowSetNull(indexTerm, rowTerm, fieldType)};
-               |} else {
-               |  $writeField;
-               |}
-             """.stripMargin
-          } else {
-            s"""
-               |${fieldExpr.code}
-               |$writeField;
-             """.stripMargin
-          }
+
+          s"""
+             |${fieldExpr.code}
+             |if (${fieldExpr.nullTerm}) {
+             |  ${binaryRowSetNull(indexTerm, rowTerm, fieldType)};
+             |} else {
+             |  $writeField;
+             |}
+           """.stripMargin
       }
     } else if (rowClass == classOf[GenericRowData] || rowClass == classOf[BoxedWrapperRowData]) {
       val writeField = if (rowClass == classOf[GenericRowData]) {
@@ -538,7 +525,7 @@ object CodeGenUtils {
         s"$rowTerm.setNullAt($indexTerm)"
       }
 
-      if (ctx.nullCheck) {
+      if (fieldType.isNullable) {
         s"""
            |${fieldExpr.code}
            |if (${fieldExpr.nullTerm}) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index f552958..c5f6f14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -145,9 +145,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   def getReusableInputUnboxingExprs(inputTerm: String, index: Int): Option[GeneratedExpression] =
     reusableInputUnboxingExprs.get((inputTerm, index))
 
-  def nullCheck: Boolean = tableConfig.getNullCheck
-
-
   /**
     * Add a line comment to [[reusableHeaderComments]] list which will be concatenated
     * into a single class header comment.
@@ -677,7 +674,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |""".stripMargin
 
     val fieldInit = seedExpr match {
-      case Some(s) if nullCheck =>
+      case Some(s) =>
         s"""
            |${s.code}
            |if (!${s.nullTerm}) {
@@ -687,11 +684,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
            |  $fieldTerm = new java.util.Random();
            |}
            |""".stripMargin
-      case Some(s) =>
-        s"""
-           |${s.code}
-           |$fieldTerm = new java.util.Random(${s.resultTerm});
-           |""".stripMargin
       case _ =>
         s"""
            |$fieldTerm = new java.util.Random();
@@ -968,7 +960,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |  throw new RuntimeException("Unsupported algorithm.");
          |}
          |""".stripMargin
-    val nullableInit = if (nullCheck) {
+    val nullableInit =
       s"""
          |${constant.code}
          |if (${constant.nullTerm}) {
@@ -977,12 +969,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |  $init
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${constant.code}
-         |$init
-         |""".stripMargin
-    }
+
     reusableInitStatements.add(nullableInit)
 
     fieldTerm
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3c3f858..38386e9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -58,11 +58,6 @@ import scala.collection.JavaConversions._
 class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
   extends RexVisitor[GeneratedExpression] {
 
-  // check if nullCheck is enabled when inputs can be null
-  if (nullableInput && !ctx.nullCheck) {
-    throw new CodeGenException("Null check must be enabled if entire rows can be null.")
-  }
-
   /**
     * term of the [[ProcessFunction]]'s context, can be changed when needed
     */
@@ -191,8 +186,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         // attribute is proctime indicator.
         // we use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
       case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
         // attribute is proctime field in a batch query.
         // it is initialized with the current time.
@@ -216,7 +210,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
           input2Term.get,
           idx,
           nullableInput,
-          ctx.nullCheck)
+          true)
         ).toSeq
       case None => Seq() // add nothing
     }
@@ -330,7 +324,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
 
     val code = if (returnTypeClazz == classOf[BinaryRowData] && outRowWriter.isDefined) {
       val writer = outRowWriter.get
-      val resetWriter = if (ctx.nullCheck) s"$writer.reset();" else s"$writer.resetCursor();"
+      val resetWriter = s"$writer.reset();"
       val completeWriter: String = s"$writer.complete();"
       s"""
          |$outRowInitCode
@@ -366,7 +360,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       inputRef.getIndex - input1Arity
     }
 
-    generateInputAccess(ctx, input._1, input._2, index, nullableInput, ctx.nullCheck)
+    generateInputAccess(ctx, input._1, input._2, index, nullableInput, true)
   }
 
   override def visitTableInputRef(rexTableInputRef: RexTableInputRef): GeneratedExpression =
@@ -381,13 +375,15 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       refExpr.resultTerm,
       index)
 
-    val resultTypeTerm = primitiveTypeTermForType(fieldAccessExpr.resultType)
-    val defaultValue = primitiveDefaultValue(fieldAccessExpr.resultType)
+    val resultType = fieldAccessExpr.resultType
+
+    val resultTypeTerm = primitiveTypeTermForType(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
     val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
       (resultTypeTerm, "result"),
       ("boolean", "isNull"))
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode =
       s"""
          |${refExpr.code}
          |if (${refExpr.nullTerm}) {
@@ -400,13 +396,6 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
          |  $nullTerm = ${fieldAccessExpr.nullTerm};
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${refExpr.code}
-         |${fieldAccessExpr.code}
-         |$resultTerm = ${fieldAccessExpr.resultTerm};
-         |""".stripMargin
-    }
 
     GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
   }
@@ -479,7 +468,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case (operandLiteral: RexLiteral, 0) if
       operandLiteral.getType.getSqlTypeName == SqlTypeName.NULL &&
         call.getOperator.getReturnTypeInference == ReturnTypes.ARG0 =>
-        generateNullLiteral(resultType, ctx.nullCheck)
+        generateNullLiteral(resultType)
 
       case (o@_, _) => o.accept(this)
     }
@@ -627,25 +616,25 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
 
       case IS_NULL =>
         val operand = operands.head
-        generateIsNull(ctx, operand)
+        generateIsNull(operand)
 
       case IS_NOT_NULL =>
         val operand = operands.head
-        generateIsNotNull(ctx, operand)
+        generateIsNotNull(operand)
 
       // logic
       case AND =>
         operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
           requireBoolean(left)
           requireBoolean(right)
-          generateAnd(ctx, left, right)
+          generateAnd(left, right)
         }
 
       case OR =>
         operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
           requireBoolean(left)
           requireBoolean(right)
-          generateOr(ctx, left, right)
+          generateOr(left, right)
         }
 
       case NOT =>
@@ -711,7 +700,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
             val array = operands.head
             val index = operands(1)
             requireInteger(index)
-            generateArrayElementAt(ctx, array, index)
+            generateArrayElementAt(array, index)
 
           case LogicalTypeRoot.MAP =>
             val key = operands(1)
@@ -739,7 +728,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case ELEMENT =>
         val array = operands.head
         requireArray(array)
-        generateArrayElement(ctx, array)
+        generateArrayElement(array)
 
       case DOT =>
         generateDot(ctx, operands)
@@ -748,8 +737,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         // attribute is proctime indicator.
         // We use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
 
       case PROCTIME_MATERIALIZE =>
         generateProctimeTimestamp(ctx, contextTerm)
@@ -764,7 +752,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case JSON_ARRAY => new JsonArrayCallGen(call).generate(ctx, operands, resultType)
 
       case _: SqlThrowExceptionFunction =>
-        val nullValue = generateNullLiteral(resultType, nullCheck = true)
+        val nullValue = generateNullLiteral(resultType)
         val code =
           s"""
              |${operands.map(_.code).mkString("\n")}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 5c6018d..dbc8d9a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -82,7 +82,7 @@ object GenerateUtils {
     val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
     val defaultValue = primitiveDefaultValue(returnType)
     val isResultNullable = resultNullable || (isReference(returnType) && !isTemporal(returnType))
-    val nullTermCode = if (ctx.nullCheck && isResultNullable) {
+    val nullTermCode = if (isResultNullable) {
       s"$nullTerm = ($resultTerm == null);"
     } else {
       ""
@@ -107,7 +107,7 @@ object GenerateUtils {
          |""".stripMargin
     }
 
-    val resultCode = if (ctx.nullCheck && operands.nonEmpty) {
+    val resultCode = if (operands.nonEmpty) {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
@@ -117,19 +117,13 @@ object GenerateUtils {
          |  $nullTermCode
          |}
          |""".stripMargin
-    } else if (ctx.nullCheck && operands.isEmpty) {
+    } else {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$nullTerm = false;
          |$wrappedResultAssignment
          |$nullTermCode
          |""".stripMargin
-    } else {
-      s"""
-         |$nullTerm = false;
-         |${operands.map(_.code).mkString("\n")}
-         |$wrappedResultAssignment
-         |""".stripMargin
     }
 
     GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
@@ -169,7 +163,7 @@ object GenerateUtils {
     val nullTerm = ctx.addReusableLocalVariable("boolean", "isNull")
     val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
     val isResultNullable = resultNullable || (isReference(returnType) && !isTemporal(returnType))
-    val nullTermCode = if (ctx.nullCheck && isResultNullable) {
+    val nullTermCode = if (isResultNullable) {
       s"$nullTerm = ($resultTerm == null);"
     } else {
       s"$nullTerm = false;"
@@ -199,7 +193,7 @@ object GenerateUtils {
          |""".stripMargin
     }
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode = if (resultNullable) {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$wrappedResultAssignment
@@ -216,7 +210,6 @@ object GenerateUtils {
        """.stripMargin
     }
 
-
     GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
   }
 
@@ -277,12 +270,7 @@ object GenerateUtils {
       s"$recordTerm = new $typeTerm();"
   }
 
-  def generateNullLiteral(
-      resultType: LogicalType,
-      nullCheck: Boolean): GeneratedExpression = {
-    if (!nullCheck) {
-      throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
-    }
+  def generateNullLiteral(resultType: LogicalType): GeneratedExpression = {
     val defaultValue = primitiveDefaultValue(resultType)
     val resultTypeTerm = primitiveTypeTermForType(resultType)
     GeneratedExpression(
@@ -318,7 +306,7 @@ object GenerateUtils {
       literalValue: Any,
       literalType: LogicalType): GeneratedExpression = {
     if (literalValue == null) {
-      return generateNullLiteral(literalType, ctx.nullCheck)
+      return generateNullLiteral(literalType)
     }
     literalType.getTypeRoot match {
       // For strings, binary and decimal, we add the literal as reusable field,
@@ -557,7 +545,7 @@ object GenerateUtils {
       (resultTypeTerm, "result"),
       ("boolean", "isNull"))
 
-    val wrappedCode = if (ctx.nullCheck) {
+    val wrappedCode =
       s"""
          |$nullTerm = $inputTerm == null;
          |$resultTerm = $defaultValue;
@@ -565,11 +553,6 @@ object GenerateUtils {
          |  $resultTerm = $inputUnboxingTerm;
          |}
          |""".stripMargin.trim
-    } else {
-      s"""
-         |$resultTerm = $inputUnboxingTerm;
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, wrappedCode, inputType)
   }
@@ -613,7 +596,7 @@ object GenerateUtils {
           (resultTypeTerm, "field"),
           ("boolean", "isNull"))
 
-        val inputCode = if (ctx.nullCheck) {
+        val inputCode =
           s"""
              |$nullTerm = $inputTerm.isNullAt($index);
              |$fieldTerm = $defaultValue;
@@ -621,12 +604,7 @@ object GenerateUtils {
              |  $fieldTerm = $readCode;
              |}
            """.stripMargin.trim
-        } else {
-          s"""
-             |$nullTerm = false;
-             |$fieldTerm = $readCode;
-           """.stripMargin
-        }
+
         GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType)
 
       case DISTINCT_TYPE =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
index 8c14843..71a5493 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
@@ -372,8 +372,7 @@ class MatchCodeGenerator(
         // attribute is proctime indicator.
         // We use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
 
       case MATCH_ROWTIME =>
         generateRowtimeAccess(
@@ -640,7 +639,7 @@ class MatchCodeGenerator(
       ctx.addReusablePerRecordStatement(codeForAgg)
 
       val defaultValue = primitiveDefaultValue(singleAggResultType)
-      val codeForSingleAgg = if (ctx.nullCheck) {
+      val codeForSingleAgg =
         j"""
            |boolean $singleAggNullTerm;
            |$primitiveSingleAggResultTypeTerm $singleAggResultTerm;
@@ -653,12 +652,6 @@ class MatchCodeGenerator(
            |  $singleAggResultTerm = $defaultValue;
            |}
            |""".stripMargin
-      } else {
-        j"""
-           |$primitiveSingleAggResultTypeTerm $singleAggResultTerm =
-           |    ($boxedSingleAggResultTypeTerm) $allAggRowTerm.getField(${aggregates.size});
-           |""".stripMargin
-      }
 
       ctx.addReusablePerRecordStatement(codeForSingleAgg)
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 60059e3..50bb157 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -492,10 +492,7 @@ object HashAggCodeGenHelper {
              |""".stripMargin.trim
 
         if (filterArg >= 0) {
-          var filterTerm = s"$inputTerm.getBoolean($filterArg)"
-          if (ctx.nullCheck) {
-            filterTerm = s"!$inputTerm.isNullAt($filterArg) && " + filterTerm
-          }
+          val filterTerm = s"!$inputTerm.isNullAt($filterArg) && $inputTerm.getBoolean($filterArg)"
           s"""
              |if ($filterTerm) {
              | $innerCode
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6288b96..020708d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.flink.util.Preconditions.checkArgument
 
 import java.time.ZoneId
-import java.util.Arrays.asList
+
 import scala.collection.JavaConversions._
 
 /**
@@ -449,8 +449,8 @@ object ScalarOperatorGens {
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
-    generateOr(ctx,
-      generateAnd(ctx, generateIsNull(ctx, left), generateIsNull(ctx, right)),
+    generateOr(
+      generateAnd(generateIsNull(left), generateIsNull(right)),
       generateEquals(ctx, left, right))
   }
 
@@ -579,13 +579,11 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateIsNull(
-      ctx: CodeGeneratorContext,
-      operand: GeneratedExpression): GeneratedExpression = {
-    if (ctx.nullCheck) {
+  def generateIsNull(operand: GeneratedExpression): GeneratedExpression = {
+    if (operand.resultType.isNullable) {
       GeneratedExpression(operand.nullTerm, NEVER_NULL, operand.code, new BooleanType(false))
     }
-    else if (!ctx.nullCheck && isReference(operand.resultType)) {
+    else if (isReference(operand.resultType)) {
       val resultTerm = newName("isNull")
       val operatorCode =
         s"""
@@ -599,10 +597,8 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateIsNotNull(
-      ctx: CodeGeneratorContext,
-      operand: GeneratedExpression): GeneratedExpression = {
-    if (ctx.nullCheck) {
+  def generateIsNotNull(operand: GeneratedExpression): GeneratedExpression = {
+    if (operand.resultType.isNullable) {
       val resultTerm = newName("result")
       val operatorCode =
         s"""
@@ -611,7 +607,7 @@ object ScalarOperatorGens {
            |""".stripMargin.trim
       GeneratedExpression(resultTerm, NEVER_NULL, operatorCode, new BooleanType(false))
     }
-    else if (!ctx.nullCheck && isReference(operand.resultType)) {
+    else if (isReference(operand.resultType)) {
       val resultTerm = newName("result")
       val operatorCode =
         s"""
@@ -625,13 +621,10 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateAnd(
-      ctx: CodeGeneratorContext,
-      left: GeneratedExpression,
-      right: GeneratedExpression): GeneratedExpression = {
+  def generateAnd(left: GeneratedExpression, right: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-    val operatorCode = if (ctx.nullCheck) {
+    val operatorCode =
       // Three-valued logic:
       // no Unknown -> Two-valued logic
       // True && Unknown -> Unknown
@@ -675,28 +668,14 @@ object ScalarOperatorGens {
          |  }
          |}
        """.stripMargin.trim
-    }
-    else {
-      s"""
-         |${left.code}
-         |boolean $resultTerm = false;
-         |if (${left.resultTerm}) {
-         |  ${right.code}
-         |  $resultTerm = ${right.resultTerm};
-         |}
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
   }
 
-  def generateOr(
-      ctx: CodeGeneratorContext,
-      left: GeneratedExpression,
-      right: GeneratedExpression): GeneratedExpression = {
+  def generateOr(left: GeneratedExpression, right: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-    val operatorCode = if (ctx.nullCheck) {
+    val operatorCode =
       // Three-valued logic:
       // no Unknown -> Two-valued logic
       // True || Unknown -> True
@@ -707,14 +686,14 @@ object ScalarOperatorGens {
       s"""
          |${left.code}
          |
-        |boolean $resultTerm = true;
+         |boolean $resultTerm = true;
          |boolean $nullTerm = false;
          |if (!${left.nullTerm} && ${left.resultTerm}) {
          |  // left expr is true, skip right expr
          |} else {
          |  ${right.code}
          |
-        |  if (!${left.nullTerm} && !${right.nullTerm}) {
+         |  if (!${left.nullTerm} && !${right.nullTerm}) {
          |    $resultTerm = ${left.resultTerm} || ${right.resultTerm};
          |    $nullTerm = false;
          |  }
@@ -740,17 +719,6 @@ object ScalarOperatorGens {
          |  }
          |}
          |""".stripMargin.trim
-    }
-    else {
-      s"""
-         |${left.code}
-         |boolean $resultTerm = true;
-         |if (!${left.resultTerm}) {
-         |  ${right.code}
-         |  $resultTerm = ${right.resultTerm};
-         |}
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
   }
@@ -869,12 +837,7 @@ object ScalarOperatorGens {
     val rule = CastRuleProvider.resolve(operand.resultType, targetType)
     rule match {
       case codeGeneratorCastRule: CodeGeneratorCastRule[_, _] =>
-        // Make sure to force nullability checks in case ctx.nullCheck is enabled
-        val inputType = if (ctx.nullCheck) {
-          operand.resultType.copy(true)
-        } else {
-          operand.resultType
-        }
+        val inputType = operand.resultType.copy(true)
 
         // Generate the code block
         val castCodeBlock = codeGeneratorCastRule.generateCodeBlock(
@@ -969,7 +932,7 @@ object ScalarOperatorGens {
       val resultTypeTerm = primitiveTypeTermForType(resultType)
       val defaultValue = primitiveDefaultValue(resultType)
 
-      val operatorCode = if (ctx.nullCheck) {
+      val operatorCode =
         s"""
            |${condition.code}
            |$resultTypeTerm $resultTerm = $defaultValue;
@@ -989,21 +952,6 @@ object ScalarOperatorGens {
            |  }
            |}
            |""".stripMargin.trim
-      }
-      else {
-        s"""
-           |${condition.code}
-           |$resultTypeTerm $resultTerm;
-           |if (${condition.resultTerm}) {
-           |  ${trueAction.code}
-           |  $resultTerm = ${trueAction.resultTerm};
-           |}
-           |else {
-           |  ${falseAction.code}
-           |  $resultTerm = ${falseAction.resultTerm};
-           |}
-           |""".stripMargin.trim
-      }
 
       GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
     }
@@ -1041,7 +989,7 @@ object ScalarOperatorGens {
     val resultTypeTerm = primitiveTypeTermForType(access.resultType)
     val defaultValue = primitiveDefaultValue(access.resultType)
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode =
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$resultTypeTerm $resultTerm;
@@ -1056,14 +1004,6 @@ object ScalarOperatorGens {
          |  $nullTerm = ${access.nullTerm};
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${operands.map(_.code).mkString("\n")}
-         |${access.code}
-         |$resultTypeTerm $resultTerm = ${access.resultTerm};
-         |""".stripMargin
-    }
-
 
     GeneratedExpression(
       resultTerm,
@@ -1105,7 +1045,7 @@ object ScalarOperatorGens {
           val tpe = fieldTypes(idx)
           if (element.literal) {
             ""
-          } else if(ctx.nullCheck) {
+          } else if (tpe.isNullable) {
             s"""
                |${element.code}
                |if (${element.nullTerm}) {
@@ -1152,7 +1092,7 @@ object ScalarOperatorGens {
     val writeCode = elements.zipWithIndex.map {
       case (element, idx) =>
         val tpe = fieldTypes(idx)
-        if (ctx.nullCheck) {
+        if (tpe.isNullable) {
           s"""
              |${element.code}
              |if (${element.nullTerm}) {
@@ -1207,7 +1147,7 @@ object ScalarOperatorGens {
           }
         }
         val array = generateLiteralArray(ctx, arrayType, mapped)
-        val code = generatePrimitiveArrayUpdateCode(ctx, array.resultTerm, elementType, elements)
+        val code = generatePrimitiveArrayUpdateCode(array.resultTerm, elementType, elements)
         GeneratedExpression(array.resultTerm, GeneratedExpression.NEVER_NULL, code, arrayType)
       } else {
         // generate general array
@@ -1217,14 +1157,13 @@ object ScalarOperatorGens {
   }
 
   private def generatePrimitiveArrayUpdateCode(
-      ctx: CodeGeneratorContext,
       arrayTerm: String,
       elementType: LogicalType,
       elements: Seq[GeneratedExpression]): String = {
     elements.zipWithIndex.map { case (element, idx) =>
       if (element.literal) {
         ""
-      } else if (ctx.nullCheck) {
+      } else if (elementType.isNullable) {
         s"""
            |${element.code}
            |if (${element.nullTerm}) {
@@ -1299,7 +1238,6 @@ object ScalarOperatorGens {
    * @see [[org.apache.calcite.sql.fun.SqlStdOperatorTable.ITEM]]
    */
   def generateArrayElementAt(
-      ctx: CodeGeneratorContext,
       array: GeneratedExpression,
       index: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
@@ -1329,9 +1267,7 @@ object ScalarOperatorGens {
     GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, componentInfo)
   }
 
-  def generateArrayElement(
-      ctx: CodeGeneratorContext,
-      array: GeneratedExpression): GeneratedExpression = {
+  def generateArrayElement(array: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
     val resultType = array.resultType.asInstanceOf[ArrayType].getElementType
     val resultTypeTerm = primitiveTypeTermForType(resultType)
@@ -1457,10 +1393,8 @@ object ScalarOperatorGens {
       // there are some non-literal primitive fields need to update
       val keyArrayTerm = newName("keyArray")
       val valueArrayTerm = newName("valueArray")
-      val keyUpdate = generatePrimitiveArrayUpdateCode(
-        ctx, keyArrayTerm, keyType, keyElements)
-      val valueUpdate = generatePrimitiveArrayUpdateCode(
-        ctx, valueArrayTerm, valueType, valueElements)
+      val keyUpdate = generatePrimitiveArrayUpdateCode(keyArrayTerm, keyType, keyElements)
+      val valueUpdate = generatePrimitiveArrayUpdateCode(valueArrayTerm, valueType, valueElements)
       s"""
          |$BINARY_ARRAY $keyArrayTerm = $binaryMap.keyArray();
          |$keyUpdate
@@ -1597,7 +1531,7 @@ object ScalarOperatorGens {
      resultType: LogicalType): GeneratedExpression = {
     checkArgument(literalExpr.literal)
     if (java.lang.Boolean.valueOf(literalExpr.nullTerm)) {
-      return generateNullLiteral(resultType, nullCheck = true)
+      return generateNullLiteral(resultType)
     }
 
     val castExecutor = CastRuleProvider.create(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
index 42c68d2..8b388b7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
@@ -82,14 +82,14 @@ object SearchOperatorGen {
         .map(CastRuleProvider.cast(toCastContext(ctx), sargType, commonType, _))
         .map(generateLiteral(ctx, _, commonType))
       if (sarg.containsNull) {
-        haystack += generateNullLiteral(commonType, ctx.nullCheck)
+        haystack += generateNullLiteral(commonType)
       }
       val setTerm = ctx.addReusableHashSet(haystack.toSeq, commonType)
       val negation = if (sarg.isComplementedPoints) "!" else ""
 
       val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-      val operatorCode = if (ctx.nullCheck) {
+      val operatorCode =
         s"""
            |${needle.code}
            |// --- Begin SEARCH ${target.resultTerm}
@@ -101,15 +101,6 @@ object SearchOperatorGen {
            |}
            |// --- End SEARCH ${target.resultTerm}
            |""".stripMargin.trim
-      }
-      else {
-        s"""
-           |${needle.code}
-           |// --- Begin SEARCH ${target.resultTerm}
-           |boolean $resultTerm = $negation$setTerm.contains(${needle.resultTerm});
-           |// --- End SEARCH ${target.resultTerm}
-           |""".stripMargin.trim
-      }
 
       GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
     } else {
@@ -127,11 +118,11 @@ object SearchOperatorGen {
         .map(RangeSets.map(_, rangeToExpression))
 
       if (sarg.containsNull) {
-        rangeChecks = Seq(generateIsNull(ctx, target)) ++ rangeChecks
+        rangeChecks = Seq(generateIsNull(target)) ++ rangeChecks
       }
 
       val generatedRangeChecks = rangeChecks
-        .reduce((left, right) => generateOr(ctx, left, right))
+        .reduce((left, right) => generateOr(left, right))
 
       // Add the target expression code
       val finalCode =
@@ -194,7 +185,6 @@ object SearchOperatorGen {
      */
     override def closed(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<=", lit(lower), target),
         generateComparison(ctx, "<=", target, lit(upper))
       )
@@ -205,7 +195,6 @@ object SearchOperatorGen {
      */
     override def closedOpen(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<=", lit(lower), target),
         generateComparison(ctx, "<", target, lit(upper))
       )
@@ -216,7 +205,6 @@ object SearchOperatorGen {
      */
     override def openClosed(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<", lit(lower), target),
         generateComparison(ctx, "<=", target, lit(upper))
       )
@@ -227,7 +215,6 @@ object SearchOperatorGen {
      */
     override def open(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<", lit(lower), target),
         generateComparison(ctx, "<", target, lit(upper))
       )
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
index a7b7add..70b753d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
@@ -326,8 +326,6 @@ class JoinITCase extends BatchTestBase {
 
   @Test
   def testFullJoinWithNonEquiJoinPred(): Unit = {
-    tEnv.getConfig.setNullCheck(true)
-
     val ds1 = CollectionBatchExecTable.get3TupleDataSet(tEnv, "a, b, c")
     val ds2 = CollectionBatchExecTable.get5TupleDataSet(tEnv, "d, e, f, g, h")