You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/06 08:13:15 UTC
[flink] 01/02: [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a332f8f51cfaf6fc3c4d88719ab000eef548e14f
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 3 10:23:30 2021 +0100
[FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour
Add a new `ExecutionConfigOption`, so that users can choose between the legacy
behaviour of CAST or the new one, including improvements and fixes.
This closes #17985.
---
.../generated/execution_config_configuration.html | 5 ++++
.../functions/AdvancedFunctionsExampleITCase.java | 18 ++++++------
.../table/api/config/ExecutionConfigOptions.java | 34 ++++++++++++++++++++++
.../planner/connectors/CollectDynamicSink.java | 11 +++++--
.../table/planner/connectors/DynamicSinkUtils.java | 6 +++-
.../casting/AbstractCodeGeneratorCastRule.java | 5 ++++
.../AbstractExpressionCodeGeneratorCastRule.java | 5 ++++
.../table/planner/functions/casting/CastRule.java | 10 ++++++-
.../functions/casting/CodeGeneratorCastRule.java | 4 +++
.../casting/RowDataToStringConverterImpl.java | 8 +++--
.../functions/casting/RowToStringCastRule.java | 13 ++++++++-
.../flink/table/planner/codegen/CodeGenUtils.scala | 2 ++
.../planner/codegen/calls/ScalarOperatorGens.scala | 8 +++++
.../planner/functions/casting/CastRulesTest.java | 18 ++++++++++--
.../expressions/utils/ExpressionTestBase.scala | 6 ++++
15 files changed, 133 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index e0c9c53..e809eb0 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -58,6 +58,11 @@ By default no operator is disabled.</td>
<td><p>Enum</p></td>
<td>Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) types will be trimmed or padded (only for CHAR(<precision>)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match [...]
</tr>
+ <td><h5>table.exec.sink.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">ENABLED</td>
+ <td><p>Enum</p></td>
+ <td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td>
+ </tr>
<tr>
<td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ERROR</td>
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
index 3306911..4bc98c2 100644
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
+++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
@@ -41,41 +41,41 @@ public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase {
assertThat(
consoleOutput,
containsString(
- "| Guillermo Smith | (5, 2020-12-05) |"));
+ "| Guillermo Smith | (5,2020-12-05) |"));
assertThat(
consoleOutput,
containsString(
- "| John Turner | (12, 2020-10-02) |"));
+ "| John Turner | (12,2020-10-02) |"));
assertThat(
consoleOutput,
containsString(
- "| Brandy Sanders | (1, 2020-10-14) |"));
+ "| Brandy Sanders | (1,2020-10-14) |"));
assertThat(
consoleOutput,
containsString(
- "| Valeria Mendoza | (10, 2020-06-02) |"));
+ "| Valeria Mendoza | (10,2020-06-02) |"));
assertThat(
consoleOutput,
containsString(
- "| Ellen Ortega | (100, 2020-06-18) |"));
+ "| Ellen Ortega | (100,2020-06-18) |"));
assertThat(
consoleOutput,
containsString(
- "| Leann Holloway | (9, 2020-05-26) |"));
+ "| Leann Holloway | (9,2020-05-26) |"));
}
private void testExecuteInternalRowMergerFunction(String consoleOutput) {
assertThat(
consoleOutput,
containsString(
- "| Guillermo Smith | (1992-12-12, New Jersey, 81... |"));
+ "| Guillermo Smith | (1992-12-12,New Jersey,816-... |"));
assertThat(
consoleOutput,
containsString(
- "| Valeria Mendoza | (1970-03-28, Los Angeles, 9... |"));
+ "| Valeria Mendoza | (1970-03-28,Los Angeles,928... |"));
assertThat(
consoleOutput,
containsString(
- "| Leann Holloway | (1989-05-21, Eugene, 614-88... |"));
+ "| Leann Holloway | (1989-05-21,Eugene,614-889-... |"));
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 5efa77f..0a8d417 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -386,6 +386,15 @@ public class ExecutionConfigOptions {
+ "Pipelined shuffle means data will be sent to consumer tasks once produced.")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR =
+ key("table.exec.sink.legacy-cast-behaviour")
+ .enumType(LegacyCastBehaviour.class)
+ .defaultValue(LegacyCastBehaviour.ENABLED)
+ .withDescription(
+ "Determines whether CAST will operate following the legacy behaviour "
+ + "or the new one that introduces various fixes and improvements.");
+
// ------------------------------------------------------------------------------------------
// Enum option types
// ------------------------------------------------------------------------------------------
@@ -453,4 +462,29 @@ public class ExecutionConfigOptions {
/** Add materialize operator in any case. */
FORCE
}
+
+ /** Determine if CAST operates using the legacy behaviour or the new one. */
+ @Deprecated
+ public enum LegacyCastBehaviour implements DescribedEnum {
+ ENABLED(true, text("CAST will operate following the legacy behaviour.")),
+ DISABLED(false, text("CAST will operate following the new correct behaviour."));
+
+ private final boolean enabled;
+ private final InlineElement description;
+
+ LegacyCastBehaviour(boolean enabled, InlineElement description) {
+ this.enabled = enabled;
+ this.description = description;
+ }
+
+ @Internal
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 575fab7..98fcf8b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -57,6 +57,7 @@ final class CollectDynamicSink implements DynamicTableSink {
private final Duration socketTimeout;
private final ClassLoader classLoader;
private final ZoneId sessionZoneId;
+ private final boolean legacyCastBehaviour;
// mutable attributes
private CollectResultIterator<RowData> iterator;
@@ -68,18 +69,21 @@ final class CollectDynamicSink implements DynamicTableSink {
MemorySize maxBatchSize,
Duration socketTimeout,
ClassLoader classLoader,
- ZoneId sessionZoneId) {
+ ZoneId sessionZoneId,
+ boolean legacyCastBehaviour) {
this.tableIdentifier = tableIdentifier;
this.consumedDataType = consumedDataType;
this.maxBatchSize = maxBatchSize;
this.socketTimeout = socketTimeout;
this.classLoader = classLoader;
this.sessionZoneId = sessionZoneId;
+ this.legacyCastBehaviour = legacyCastBehaviour;
}
public ResultProvider getSelectResultProvider() {
return new CollectResultProvider(
- new RowDataToStringConverterImpl(consumedDataType, sessionZoneId, classLoader));
+ new RowDataToStringConverterImpl(
+ consumedDataType, sessionZoneId, classLoader, legacyCastBehaviour));
}
@Override
@@ -132,7 +136,8 @@ final class CollectDynamicSink implements DynamicTableSink {
maxBatchSize,
socketTimeout,
classLoader,
- sessionZoneId);
+ sessionZoneId,
+ legacyCastBehaviour);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 87c432e..acb557a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
@@ -115,7 +116,10 @@ public final class DynamicSinkUtils {
configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
classLoader,
- zoneId);
+ zoneId,
+ configuration
+ .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
+ .isEnabled());
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
collectModifyOperation.setConsumedDataType(consumedDataType);
return convertSinkToRel(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
index 9e068b8..209ee8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
@@ -182,6 +182,11 @@ abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<I
}
@Override
+ public boolean legacyBehaviour() {
+ return castRuleCtx.legacyBehaviour();
+ }
+
+ @Override
public String getSessionTimeZoneTerm() {
return "java.util.TimeZone.getTimeZone(\""
+ castRuleCtx.getSessionZoneId().getId()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 52c0b85..a737678 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -104,6 +104,11 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
CastRule.Context ctx) {
return new CodeGeneratorCastRule.Context() {
@Override
+ public boolean legacyBehaviour() {
+ return ctx.legacyBehaviour();
+ }
+
+ @Override
public String getSessionTimeZoneTerm() {
return "java.util.TimeZone.getTimeZone(\"" + ctx.getSessionZoneId().getId() + "\")";
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index 58217e4..78d6f2d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -49,14 +49,22 @@ public interface CastRule<IN, OUT> {
/** Casting context. */
interface Context {
+ @Deprecated
+ boolean legacyBehaviour();
+
ZoneId getSessionZoneId();
ClassLoader getClassLoader();
/** Create a casting context. */
- static Context create(ZoneId zoneId, ClassLoader classLoader) {
+ static Context create(boolean legacyBehaviour, ZoneId zoneId, ClassLoader classLoader) {
return new Context() {
@Override
+ public boolean legacyBehaviour() {
+ return legacyBehaviour;
+ }
+
+ @Override
public ZoneId getSessionZoneId() {
return zoneId;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
index 2f9de82..1b189fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
@@ -44,6 +44,10 @@ public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> {
/** Context for code generation. */
interface Context {
+ /** @return where the legacy behaviour should be followed or not. */
+ @Deprecated
+ boolean legacyBehaviour();
+
/** @return the session time zone term */
String getSessionTimeZoneTerm();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
index 62c8e53..b3be464 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
@@ -45,11 +45,13 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
this(
dataType,
DateTimeUtils.UTC_ZONE.toZoneId(),
- Thread.currentThread().getContextClassLoader());
+ Thread.currentThread().getContextClassLoader(),
+ true);
}
@SuppressWarnings("unchecked")
- public RowDataToStringConverterImpl(DataType dataType, ZoneId zoneId, ClassLoader classLoader) {
+ public RowDataToStringConverterImpl(
+ DataType dataType, ZoneId zoneId, ClassLoader classLoader, boolean legacyBehaviour) {
List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType);
this.columnConverters = new Function[rowDataTypes.size()];
@@ -60,7 +62,7 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
CastExecutor<Object, StringData> castExecutor =
(CastExecutor<Object, StringData>)
CastRuleProvider.create(
- CastRule.Context.create(zoneId, classLoader),
+ CastRule.Context.create(legacyBehaviour, zoneId, classLoader),
fieldType,
STRING().getLogicalType());
if (castExecutor == null) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
index baa5119..911d262 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
@@ -125,7 +125,8 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
// Write the comma
if (fieldIndex != 0) {
- writer.stmt(methodCall(builderTerm, "append", strLiteral(", ")));
+ final String comma = getDelimiter(context);
+ writer.stmt(methodCall(builderTerm, "append", comma));
}
writer
@@ -167,4 +168,14 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
return writer.toString();
}
+
+ private String getDelimiter(CodeGeneratorCastRule.Context context) {
+ final String comma;
+ if (context.legacyBehaviour()) {
+ comma = strLiteral(",");
+ } else {
+ comma = strLiteral(", ");
+ }
+ return comma;
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index b21d097..7a72e25 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -54,6 +54,8 @@ object CodeGenUtils {
// ------------------------------- DEFAULT TERMS ------------------------------------------
+ val DEFAULT_LEGACY_CAST_BEHAVIOUR = "legacyCastBehaviour"
+
val DEFAULT_TIMEZONE_TERM = "timeZone"
val DEFAULT_INPUT1_TERM = "in1"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index fd5da47..045976c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.codegen.calls
import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.binary.BinaryArrayData
import org.apache.flink.table.data.util.MapDataUtil
import org.apache.flink.table.data.utils.CastExecutor
@@ -2162,6 +2163,7 @@ object ScalarOperatorGens {
def toCodegenCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
new CodeGeneratorCastRule.Context {
+ override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
override def declareVariable(ty: String, variablePrefix: String): String =
ctx.addReusableLocalVariable(ty, variablePrefix)
@@ -2176,10 +2178,16 @@ object ScalarOperatorGens {
def toCastContext(ctx: CodeGeneratorContext): CastRule.Context = {
new CastRule.Context {
+ override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
+
override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone
override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
}
}
+ private def isLegacyCastBehaviourEnabled(ctx: CodeGeneratorContext) = {
+ ctx.tableConfig
+ .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index a0c66b5..6f74b45 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -95,7 +95,7 @@ class CastRulesTest {
private static final ZoneId CET = ZoneId.of("CET");
private static final CastRule.Context CET_CONTEXT =
- CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
+ CastRule.Context.create(false, CET, Thread.currentThread().getContextClassLoader());
private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -614,7 +614,14 @@ class CastRulesTest {
ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
GenericRowData.of(
StringData.fromString("abc"), StringData.fromString("def")),
- StringData.fromString("(abc, def)"))
+ StringData.fromString("(abc, def)"),
+ false)
+ .fromCase(
+ ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
+ GenericRowData.of(
+ StringData.fromString("abc"), StringData.fromString("def")),
+ StringData.fromString("(abc,def)"),
+ true)
.fromCase(
ROW(FIELD("f0", INT().nullable()), FIELD("f1", STRING())),
GenericRowData.of(null, StringData.fromString("abc")),
@@ -860,9 +867,15 @@ class CastRulesTest {
}
private CastTestSpecBuilder fromCase(DataType dataType, Object src, Object target) {
+ return fromCase(dataType, src, target, false);
+ }
+
+ private CastTestSpecBuilder fromCase(
+ DataType dataType, Object src, Object target, boolean legacyBehaviour) {
return fromCase(
dataType,
CastRule.Context.create(
+ legacyBehaviour,
DateTimeUtils.UTC_ZONE.toZoneId(),
Thread.currentThread().getContextClassLoader()),
src,
@@ -900,6 +913,7 @@ class CastRulesTest {
return fail(
dataType,
CastRule.Context.create(
+ false,
DateTimeUtils.UTC_ZONE.toZoneId(),
Thread.currentThread().getContextClassLoader()),
src,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 5204b94..b62f158 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableException, ValidationException}
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.binary.BinaryRowData
@@ -50,6 +51,7 @@ import org.apache.flink.table.types.AbstractDataType
import org.apache.flink.table.types.logical.{RowType, VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.types.Row
+
import org.junit.Assert.{assertEquals, assertTrue, fail}
import org.junit.rules.ExpectedException
import org.junit.{After, Before, Rule}
@@ -98,6 +100,10 @@ abstract class ExpressionTestBase {
@Before
def prepare(): Unit = {
+ config.getConfiguration.set(
+ ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+ ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
+ )
if (containsLegacyTypes) {
val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*)