You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/06 08:13:15 UTC

[flink] 01/02: [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a332f8f51cfaf6fc3c4d88719ab000eef548e14f
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 3 10:23:30 2021 +0100

    [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour
    
    Add a new `ExecutionConfigOption`, so that users can choose between the legacy
    behaviour of CAST or the new one, including improvements and fixes.
    
    This closes #17985.
---
 .../generated/execution_config_configuration.html  |  5 ++++
 .../functions/AdvancedFunctionsExampleITCase.java  | 18 ++++++------
 .../table/api/config/ExecutionConfigOptions.java   | 34 ++++++++++++++++++++++
 .../planner/connectors/CollectDynamicSink.java     | 11 +++++--
 .../table/planner/connectors/DynamicSinkUtils.java |  6 +++-
 .../casting/AbstractCodeGeneratorCastRule.java     |  5 ++++
 .../AbstractExpressionCodeGeneratorCastRule.java   |  5 ++++
 .../table/planner/functions/casting/CastRule.java  | 10 ++++++-
 .../functions/casting/CodeGeneratorCastRule.java   |  4 +++
 .../casting/RowDataToStringConverterImpl.java      |  8 +++--
 .../functions/casting/RowToStringCastRule.java     | 13 ++++++++-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  2 ++
 .../planner/codegen/calls/ScalarOperatorGens.scala |  8 +++++
 .../planner/functions/casting/CastRulesTest.java   | 18 ++++++++++--
 .../expressions/utils/ExpressionTestBase.scala     |  6 ++++
 15 files changed, 133 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index e0c9c53..e809eb0 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -58,6 +58,11 @@ By default no operator is disabled.</td>
             <td><p>Enum</p></td>
             <td>Determines whether string values for columns with CHAR(&lt;precision&gt;)/VARCHAR(&lt;precision&gt;) types will be trimmed or padded (only for CHAR(&lt;precision&gt;)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match  [...]
         </tr>
+            <td><h5>table.exec.sink.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">ENABLED</td>
+            <td><p>Enum</p></td>
+            <td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">ERROR</td>
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
index 3306911..4bc98c2 100644
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
+++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
@@ -41,41 +41,41 @@ public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase {
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Guillermo Smith |                (5, 2020-12-05) |"));
+                        "|                Guillermo Smith |                 (5,2020-12-05) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                    John Turner |               (12, 2020-10-02) |"));
+                        "|                    John Turner |                (12,2020-10-02) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Brandy Sanders |                (1, 2020-10-14) |"));
+                        "|                 Brandy Sanders |                 (1,2020-10-14) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Valeria Mendoza |               (10, 2020-06-02) |"));
+                        "|                Valeria Mendoza |                (10,2020-06-02) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                   Ellen Ortega |              (100, 2020-06-18) |"));
+                        "|                   Ellen Ortega |               (100,2020-06-18) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Leann Holloway |                (9, 2020-05-26) |"));
+                        "|                 Leann Holloway |                 (9,2020-05-26) |"));
     }
 
     private void testExecuteInternalRowMergerFunction(String consoleOutput) {
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Guillermo Smith | (1992-12-12, New Jersey, 81... |"));
+                        "|                Guillermo Smith | (1992-12-12,New Jersey,816-... |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Valeria Mendoza | (1970-03-28, Los Angeles, 9... |"));
+                        "|                Valeria Mendoza | (1970-03-28,Los Angeles,928... |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Leann Holloway | (1989-05-21, Eugene, 614-88... |"));
+                        "|                 Leann Holloway | (1989-05-21,Eugene,614-889-... |"));
     }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 5efa77f..0a8d417 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -386,6 +386,15 @@ public class ExecutionConfigOptions {
                                                     + "Pipelined shuffle means data will be sent to consumer tasks once produced.")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR =
+            key("table.exec.sink.legacy-cast-behaviour")
+                    .enumType(LegacyCastBehaviour.class)
+                    .defaultValue(LegacyCastBehaviour.ENABLED)
+                    .withDescription(
+                            "Determines whether CAST will operate following the legacy behaviour "
+                                    + "or the new one that introduces various fixes and improvements.");
+
     // ------------------------------------------------------------------------------------------
     // Enum option types
     // ------------------------------------------------------------------------------------------
@@ -453,4 +462,29 @@ public class ExecutionConfigOptions {
         /** Add materialize operator in any case. */
         FORCE
     }
+
+    /** Determine if CAST operates using the legacy behaviour or the new one. */
+    @Deprecated
+    public enum LegacyCastBehaviour implements DescribedEnum {
+        ENABLED(true, text("CAST will operate following the legacy behaviour.")),
+        DISABLED(false, text("CAST will operate following the new correct behaviour."));
+
+        private final boolean enabled;
+        private final InlineElement description;
+
+        LegacyCastBehaviour(boolean enabled, InlineElement description) {
+            this.enabled = enabled;
+            this.description = description;
+        }
+
+        @Internal
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+
+        public boolean isEnabled() {
+            return enabled;
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 575fab7..98fcf8b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -57,6 +57,7 @@ final class CollectDynamicSink implements DynamicTableSink {
     private final Duration socketTimeout;
     private final ClassLoader classLoader;
     private final ZoneId sessionZoneId;
+    private final boolean legacyCastBehaviour;
 
     // mutable attributes
     private CollectResultIterator<RowData> iterator;
@@ -68,18 +69,21 @@ final class CollectDynamicSink implements DynamicTableSink {
             MemorySize maxBatchSize,
             Duration socketTimeout,
             ClassLoader classLoader,
-            ZoneId sessionZoneId) {
+            ZoneId sessionZoneId,
+            boolean legacyCastBehaviour) {
         this.tableIdentifier = tableIdentifier;
         this.consumedDataType = consumedDataType;
         this.maxBatchSize = maxBatchSize;
         this.socketTimeout = socketTimeout;
         this.classLoader = classLoader;
         this.sessionZoneId = sessionZoneId;
+        this.legacyCastBehaviour = legacyCastBehaviour;
     }
 
     public ResultProvider getSelectResultProvider() {
         return new CollectResultProvider(
-                new RowDataToStringConverterImpl(consumedDataType, sessionZoneId, classLoader));
+                new RowDataToStringConverterImpl(
+                        consumedDataType, sessionZoneId, classLoader, legacyCastBehaviour));
     }
 
     @Override
@@ -132,7 +136,8 @@ final class CollectDynamicSink implements DynamicTableSink {
                 maxBatchSize,
                 socketTimeout,
                 classLoader,
-                sessionZoneId);
+                sessionZoneId,
+                legacyCastBehaviour);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 87c432e..acb557a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
@@ -115,7 +116,10 @@ public final class DynamicSinkUtils {
                         configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
                         configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
                         classLoader,
-                        zoneId);
+                        zoneId,
+                        configuration
+                                .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
+                                .isEnabled());
         collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
         collectModifyOperation.setConsumedDataType(consumedDataType);
         return convertSinkToRel(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
index 9e068b8..209ee8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
@@ -182,6 +182,11 @@ abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<I
         }
 
         @Override
+        public boolean legacyBehaviour() {
+            return castRuleCtx.legacyBehaviour();
+        }
+
+        @Override
         public String getSessionTimeZoneTerm() {
             return "java.util.TimeZone.getTimeZone(\""
                     + castRuleCtx.getSessionZoneId().getId()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 52c0b85..a737678 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -104,6 +104,11 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
             CastRule.Context ctx) {
         return new CodeGeneratorCastRule.Context() {
             @Override
+            public boolean legacyBehaviour() {
+                return ctx.legacyBehaviour();
+            }
+
+            @Override
             public String getSessionTimeZoneTerm() {
                 return "java.util.TimeZone.getTimeZone(\"" + ctx.getSessionZoneId().getId() + "\")";
             }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index 58217e4..78d6f2d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -49,14 +49,22 @@ public interface CastRule<IN, OUT> {
 
     /** Casting context. */
     interface Context {
+        @Deprecated
+        boolean legacyBehaviour();
+
         ZoneId getSessionZoneId();
 
         ClassLoader getClassLoader();
 
         /** Create a casting context. */
-        static Context create(ZoneId zoneId, ClassLoader classLoader) {
+        static Context create(boolean legacyBehaviour, ZoneId zoneId, ClassLoader classLoader) {
             return new Context() {
                 @Override
+                public boolean legacyBehaviour() {
+                    return legacyBehaviour;
+                }
+
+                @Override
                 public ZoneId getSessionZoneId() {
                     return zoneId;
                 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
index 2f9de82..1b189fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
@@ -44,6 +44,10 @@ public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> {
 
     /** Context for code generation. */
     interface Context {
+        /** @return where the legacy behaviour should be followed or not. */
+        @Deprecated
+        boolean legacyBehaviour();
+
         /** @return the session time zone term */
         String getSessionTimeZoneTerm();
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
index 62c8e53..b3be464 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
@@ -45,11 +45,13 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
         this(
                 dataType,
                 DateTimeUtils.UTC_ZONE.toZoneId(),
-                Thread.currentThread().getContextClassLoader());
+                Thread.currentThread().getContextClassLoader(),
+                true);
     }
 
     @SuppressWarnings("unchecked")
-    public RowDataToStringConverterImpl(DataType dataType, ZoneId zoneId, ClassLoader classLoader) {
+    public RowDataToStringConverterImpl(
+            DataType dataType, ZoneId zoneId, ClassLoader classLoader, boolean legacyBehaviour) {
         List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType);
         this.columnConverters = new Function[rowDataTypes.size()];
 
@@ -60,7 +62,7 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
             CastExecutor<Object, StringData> castExecutor =
                     (CastExecutor<Object, StringData>)
                             CastRuleProvider.create(
-                                    CastRule.Context.create(zoneId, classLoader),
+                                    CastRule.Context.create(legacyBehaviour, zoneId, classLoader),
                                     fieldType,
                                     STRING().getLogicalType());
             if (castExecutor == null) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
index baa5119..911d262 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
@@ -125,7 +125,8 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
 
             // Write the comma
             if (fieldIndex != 0) {
-                writer.stmt(methodCall(builderTerm, "append", strLiteral(", ")));
+                final String comma = getDelimiter(context);
+                writer.stmt(methodCall(builderTerm, "append", comma));
             }
 
             writer
@@ -167,4 +168,14 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
 
         return writer.toString();
     }
+
+    private String getDelimiter(CodeGeneratorCastRule.Context context) {
+        final String comma;
+        if (context.legacyBehaviour()) {
+            comma = strLiteral(",");
+        } else {
+            comma = strLiteral(", ");
+        }
+        return comma;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index b21d097..7a72e25 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -54,6 +54,8 @@ object CodeGenUtils {
 
   // ------------------------------- DEFAULT TERMS ------------------------------------------
 
+  val DEFAULT_LEGACY_CAST_BEHAVIOUR = "legacyCastBehaviour"
+
   val DEFAULT_TIMEZONE_TERM = "timeZone"
 
   val DEFAULT_INPUT1_TERM = "in1"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index fd5da47..045976c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.data.util.MapDataUtil
 import org.apache.flink.table.data.utils.CastExecutor
@@ -2162,6 +2163,7 @@ object ScalarOperatorGens {
 
   def toCodegenCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
     new CodeGeneratorCastRule.Context {
+      override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
       override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
       override def declareVariable(ty: String, variablePrefix: String): String =
         ctx.addReusableLocalVariable(ty, variablePrefix)
@@ -2176,10 +2178,16 @@ object ScalarOperatorGens {
 
   def toCastContext(ctx: CodeGeneratorContext): CastRule.Context = {
     new CastRule.Context {
+      override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
+
       override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone
 
       override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
     }
   }
 
+  private def isLegacyCastBehaviourEnabled(ctx: CodeGeneratorContext) = {
+    ctx.tableConfig
+      .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index a0c66b5..6f74b45 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -95,7 +95,7 @@ class CastRulesTest {
     private static final ZoneId CET = ZoneId.of("CET");
 
     private static final CastRule.Context CET_CONTEXT =
-            CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
+            CastRule.Context.create(false, CET, Thread.currentThread().getContextClassLoader());
 
     private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
     private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -614,7 +614,14 @@ class CastRulesTest {
                                 ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
                                 GenericRowData.of(
                                         StringData.fromString("abc"), StringData.fromString("def")),
-                                StringData.fromString("(abc, def)"))
+                                StringData.fromString("(abc, def)"),
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
+                                GenericRowData.of(
+                                        StringData.fromString("abc"), StringData.fromString("def")),
+                                StringData.fromString("(abc,def)"),
+                                true)
                         .fromCase(
                                 ROW(FIELD("f0", INT().nullable()), FIELD("f1", STRING())),
                                 GenericRowData.of(null, StringData.fromString("abc")),
@@ -860,9 +867,15 @@ class CastRulesTest {
         }
 
         private CastTestSpecBuilder fromCase(DataType dataType, Object src, Object target) {
+            return fromCase(dataType, src, target, false);
+        }
+
+        private CastTestSpecBuilder fromCase(
+                DataType dataType, Object src, Object target, boolean legacyBehaviour) {
             return fromCase(
                     dataType,
                     CastRule.Context.create(
+                            legacyBehaviour,
                             DateTimeUtils.UTC_ZONE.toZoneId(),
                             Thread.currentThread().getContextClassLoader()),
                     src,
@@ -900,6 +913,7 @@ class CastRulesTest {
             return fail(
                     dataType,
                     CastRule.Context.create(
+                            false,
                             DateTimeUtils.UTC_ZONE.toZoneId(),
                             Thread.currentThread().getContextClassLoader()),
                     src,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 5204b94..b62f158 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableException, ValidationException}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
@@ -50,6 +51,7 @@ import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.logical.{RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
+
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
@@ -98,6 +100,10 @@ abstract class ExpressionTestBase {
 
   @Before
   def prepare(): Unit = {
+    config.getConfiguration.set(
+      ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+      ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
+    )
     if (containsLegacyTypes) {
       val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
       tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*)