You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/06 08:13:14 UTC

[flink] branch master updated (dac6425 -> 76b47b2)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dac6425  [FLINK-17782] Add array,map,row types support for parquet row writer
     new a332f8f  [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour
     new 76b47b2  [hotfix][table-planner] Add class header comment to generated code

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/execution_config_configuration.html  |  5 ++++
 .../functions/AdvancedFunctionsExampleITCase.java  | 18 ++++++------
 .../table/api/config/ExecutionConfigOptions.java   | 34 ++++++++++++++++++++++
 .../planner/connectors/CollectDynamicSink.java     | 11 +++++--
 .../table/planner/connectors/DynamicSinkUtils.java |  6 +++-
 .../casting/AbstractCodeGeneratorCastRule.java     |  5 ++++
 .../AbstractExpressionCodeGeneratorCastRule.java   |  5 ++++
 .../table/planner/functions/casting/CastRule.java  | 10 ++++++-
 .../functions/casting/CodeGeneratorCastRule.java   |  4 +++
 .../casting/RowDataToStringConverterImpl.java      |  8 +++--
 .../functions/casting/RowToStringCastRule.java     | 13 ++++++++-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  2 ++
 .../planner/codegen/CodeGeneratorContext.scala     | 25 ++++++++++++++++
 .../planner/codegen/FunctionCodeGenerator.scala    |  1 +
 .../planner/codegen/calls/ScalarOperatorGens.scala | 14 +++++++++
 .../planner/functions/casting/CastRulesTest.java   | 18 ++++++++++--
 .../expressions/utils/ExpressionTestBase.scala     |  6 ++++
 17 files changed, 165 insertions(+), 20 deletions(-)

[flink] 01/02: [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a332f8f51cfaf6fc3c4d88719ab000eef548e14f
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 3 10:23:30 2021 +0100

    [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour
    
    Add a new `ExecutionConfigOption`, so that users can choose between the legacy
    behaviour of CAST or the new one, including improvements and fixes.
    
    This closes #17985.
---
 .../generated/execution_config_configuration.html  |  5 ++++
 .../functions/AdvancedFunctionsExampleITCase.java  | 18 ++++++------
 .../table/api/config/ExecutionConfigOptions.java   | 34 ++++++++++++++++++++++
 .../planner/connectors/CollectDynamicSink.java     | 11 +++++--
 .../table/planner/connectors/DynamicSinkUtils.java |  6 +++-
 .../casting/AbstractCodeGeneratorCastRule.java     |  5 ++++
 .../AbstractExpressionCodeGeneratorCastRule.java   |  5 ++++
 .../table/planner/functions/casting/CastRule.java  | 10 ++++++-
 .../functions/casting/CodeGeneratorCastRule.java   |  4 +++
 .../casting/RowDataToStringConverterImpl.java      |  8 +++--
 .../functions/casting/RowToStringCastRule.java     | 13 ++++++++-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  2 ++
 .../planner/codegen/calls/ScalarOperatorGens.scala |  8 +++++
 .../planner/functions/casting/CastRulesTest.java   | 18 ++++++++++--
 .../expressions/utils/ExpressionTestBase.scala     |  6 ++++
 15 files changed, 133 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index e0c9c53..e809eb0 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -58,6 +58,11 @@ By default no operator is disabled.</td>
             <td><p>Enum</p></td>
             <td>Determines whether string values for columns with CHAR(&lt;precision&gt;)/VARCHAR(&lt;precision&gt;) types will be trimmed or padded (only for CHAR(&lt;precision&gt;)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match  [...]
         </tr>
+            <td><h5>table.exec.sink.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">ENABLED</td>
+            <td><p>Enum</p></td>
+            <td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">ERROR</td>
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
index 3306911..4bc98c2 100644
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
+++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
@@ -41,41 +41,41 @@ public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase {
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Guillermo Smith |                (5, 2020-12-05) |"));
+                        "|                Guillermo Smith |                 (5,2020-12-05) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                    John Turner |               (12, 2020-10-02) |"));
+                        "|                    John Turner |                (12,2020-10-02) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Brandy Sanders |                (1, 2020-10-14) |"));
+                        "|                 Brandy Sanders |                 (1,2020-10-14) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Valeria Mendoza |               (10, 2020-06-02) |"));
+                        "|                Valeria Mendoza |                (10,2020-06-02) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                   Ellen Ortega |              (100, 2020-06-18) |"));
+                        "|                   Ellen Ortega |               (100,2020-06-18) |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Leann Holloway |                (9, 2020-05-26) |"));
+                        "|                 Leann Holloway |                 (9,2020-05-26) |"));
     }
 
     private void testExecuteInternalRowMergerFunction(String consoleOutput) {
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Guillermo Smith | (1992-12-12, New Jersey, 81... |"));
+                        "|                Guillermo Smith | (1992-12-12,New Jersey,816-... |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                Valeria Mendoza | (1970-03-28, Los Angeles, 9... |"));
+                        "|                Valeria Mendoza | (1970-03-28,Los Angeles,928... |"));
         assertThat(
                 consoleOutput,
                 containsString(
-                        "|                 Leann Holloway | (1989-05-21, Eugene, 614-88... |"));
+                        "|                 Leann Holloway | (1989-05-21,Eugene,614-889-... |"));
     }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 5efa77f..0a8d417 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -386,6 +386,15 @@ public class ExecutionConfigOptions {
                                                     + "Pipelined shuffle means data will be sent to consumer tasks once produced.")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR =
+            key("table.exec.sink.legacy-cast-behaviour")
+                    .enumType(LegacyCastBehaviour.class)
+                    .defaultValue(LegacyCastBehaviour.ENABLED)
+                    .withDescription(
+                            "Determines whether CAST will operate following the legacy behaviour "
+                                    + "or the new one that introduces various fixes and improvements.");
+
     // ------------------------------------------------------------------------------------------
     // Enum option types
     // ------------------------------------------------------------------------------------------
@@ -453,4 +462,29 @@ public class ExecutionConfigOptions {
         /** Add materialize operator in any case. */
         FORCE
     }
+
+    /** Determine if CAST operates using the legacy behaviour or the new one. */
+    @Deprecated
+    public enum LegacyCastBehaviour implements DescribedEnum {
+        ENABLED(true, text("CAST will operate following the legacy behaviour.")),
+        DISABLED(false, text("CAST will operate following the new correct behaviour."));
+
+        private final boolean enabled;
+        private final InlineElement description;
+
+        LegacyCastBehaviour(boolean enabled, InlineElement description) {
+            this.enabled = enabled;
+            this.description = description;
+        }
+
+        @Internal
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+
+        public boolean isEnabled() {
+            return enabled;
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 575fab7..98fcf8b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -57,6 +57,7 @@ final class CollectDynamicSink implements DynamicTableSink {
     private final Duration socketTimeout;
     private final ClassLoader classLoader;
     private final ZoneId sessionZoneId;
+    private final boolean legacyCastBehaviour;
 
     // mutable attributes
     private CollectResultIterator<RowData> iterator;
@@ -68,18 +69,21 @@ final class CollectDynamicSink implements DynamicTableSink {
             MemorySize maxBatchSize,
             Duration socketTimeout,
             ClassLoader classLoader,
-            ZoneId sessionZoneId) {
+            ZoneId sessionZoneId,
+            boolean legacyCastBehaviour) {
         this.tableIdentifier = tableIdentifier;
         this.consumedDataType = consumedDataType;
         this.maxBatchSize = maxBatchSize;
         this.socketTimeout = socketTimeout;
         this.classLoader = classLoader;
         this.sessionZoneId = sessionZoneId;
+        this.legacyCastBehaviour = legacyCastBehaviour;
     }
 
     public ResultProvider getSelectResultProvider() {
         return new CollectResultProvider(
-                new RowDataToStringConverterImpl(consumedDataType, sessionZoneId, classLoader));
+                new RowDataToStringConverterImpl(
+                        consumedDataType, sessionZoneId, classLoader, legacyCastBehaviour));
     }
 
     @Override
@@ -132,7 +136,8 @@ final class CollectDynamicSink implements DynamicTableSink {
                 maxBatchSize,
                 socketTimeout,
                 classLoader,
-                sessionZoneId);
+                sessionZoneId,
+                legacyCastBehaviour);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 87c432e..acb557a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
@@ -115,7 +116,10 @@ public final class DynamicSinkUtils {
                         configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
                         configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
                         classLoader,
-                        zoneId);
+                        zoneId,
+                        configuration
+                                .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
+                                .isEnabled());
         collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
         collectModifyOperation.setConsumedDataType(consumedDataType);
         return convertSinkToRel(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
index 9e068b8..209ee8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
@@ -182,6 +182,11 @@ abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<I
         }
 
         @Override
+        public boolean legacyBehaviour() {
+            return castRuleCtx.legacyBehaviour();
+        }
+
+        @Override
         public String getSessionTimeZoneTerm() {
             return "java.util.TimeZone.getTimeZone(\""
                     + castRuleCtx.getSessionZoneId().getId()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 52c0b85..a737678 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -104,6 +104,11 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
             CastRule.Context ctx) {
         return new CodeGeneratorCastRule.Context() {
             @Override
+            public boolean legacyBehaviour() {
+                return ctx.legacyBehaviour();
+            }
+
+            @Override
             public String getSessionTimeZoneTerm() {
                 return "java.util.TimeZone.getTimeZone(\"" + ctx.getSessionZoneId().getId() + "\")";
             }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index 58217e4..78d6f2d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -49,14 +49,22 @@ public interface CastRule<IN, OUT> {
 
     /** Casting context. */
     interface Context {
+        @Deprecated
+        boolean legacyBehaviour();
+
         ZoneId getSessionZoneId();
 
         ClassLoader getClassLoader();
 
         /** Create a casting context. */
-        static Context create(ZoneId zoneId, ClassLoader classLoader) {
+        static Context create(boolean legacyBehaviour, ZoneId zoneId, ClassLoader classLoader) {
             return new Context() {
                 @Override
+                public boolean legacyBehaviour() {
+                    return legacyBehaviour;
+                }
+
+                @Override
                 public ZoneId getSessionZoneId() {
                     return zoneId;
                 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
index 2f9de82..1b189fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java
@@ -44,6 +44,10 @@ public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> {
 
     /** Context for code generation. */
     interface Context {
+        /** @return where the legacy behaviour should be followed or not. */
+        @Deprecated
+        boolean legacyBehaviour();
+
         /** @return the session time zone term */
         String getSessionTimeZoneTerm();
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
index 62c8e53..b3be464 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
@@ -45,11 +45,13 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
         this(
                 dataType,
                 DateTimeUtils.UTC_ZONE.toZoneId(),
-                Thread.currentThread().getContextClassLoader());
+                Thread.currentThread().getContextClassLoader(),
+                true);
     }
 
     @SuppressWarnings("unchecked")
-    public RowDataToStringConverterImpl(DataType dataType, ZoneId zoneId, ClassLoader classLoader) {
+    public RowDataToStringConverterImpl(
+            DataType dataType, ZoneId zoneId, ClassLoader classLoader, boolean legacyBehaviour) {
         List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType);
         this.columnConverters = new Function[rowDataTypes.size()];
 
@@ -60,7 +62,7 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver
             CastExecutor<Object, StringData> castExecutor =
                     (CastExecutor<Object, StringData>)
                             CastRuleProvider.create(
-                                    CastRule.Context.create(zoneId, classLoader),
+                                    CastRule.Context.create(legacyBehaviour, zoneId, classLoader),
                                     fieldType,
                                     STRING().getLogicalType());
             if (castExecutor == null) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
index baa5119..911d262 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
@@ -125,7 +125,8 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
 
             // Write the comma
             if (fieldIndex != 0) {
-                writer.stmt(methodCall(builderTerm, "append", strLiteral(", ")));
+                final String comma = getDelimiter(context);
+                writer.stmt(methodCall(builderTerm, "append", comma));
             }
 
             writer
@@ -167,4 +168,14 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
 
         return writer.toString();
     }
+
+    private String getDelimiter(CodeGeneratorCastRule.Context context) {
+        final String comma;
+        if (context.legacyBehaviour()) {
+            comma = strLiteral(",");
+        } else {
+            comma = strLiteral(", ");
+        }
+        return comma;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index b21d097..7a72e25 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -54,6 +54,8 @@ object CodeGenUtils {
 
   // ------------------------------- DEFAULT TERMS ------------------------------------------
 
+  val DEFAULT_LEGACY_CAST_BEHAVIOUR = "legacyCastBehaviour"
+
   val DEFAULT_TIMEZONE_TERM = "timeZone"
 
   val DEFAULT_INPUT1_TERM = "in1"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index fd5da47..045976c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.data.util.MapDataUtil
 import org.apache.flink.table.data.utils.CastExecutor
@@ -2162,6 +2163,7 @@ object ScalarOperatorGens {
 
   def toCodegenCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
     new CodeGeneratorCastRule.Context {
+      override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
       override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
       override def declareVariable(ty: String, variablePrefix: String): String =
         ctx.addReusableLocalVariable(ty, variablePrefix)
@@ -2176,10 +2178,16 @@ object ScalarOperatorGens {
 
   def toCastContext(ctx: CodeGeneratorContext): CastRule.Context = {
     new CastRule.Context {
+      override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
+
       override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone
 
       override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
     }
   }
 
+  private def isLegacyCastBehaviourEnabled(ctx: CodeGeneratorContext) = {
+    ctx.tableConfig
+      .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index a0c66b5..6f74b45 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -95,7 +95,7 @@ class CastRulesTest {
     private static final ZoneId CET = ZoneId.of("CET");
 
     private static final CastRule.Context CET_CONTEXT =
-            CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
+            CastRule.Context.create(false, CET, Thread.currentThread().getContextClassLoader());
 
     private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
     private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -614,7 +614,14 @@ class CastRulesTest {
                                 ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
                                 GenericRowData.of(
                                         StringData.fromString("abc"), StringData.fromString("def")),
-                                StringData.fromString("(abc, def)"))
+                                StringData.fromString("(abc, def)"),
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
+                                GenericRowData.of(
+                                        StringData.fromString("abc"), StringData.fromString("def")),
+                                StringData.fromString("(abc,def)"),
+                                true)
                         .fromCase(
                                 ROW(FIELD("f0", INT().nullable()), FIELD("f1", STRING())),
                                 GenericRowData.of(null, StringData.fromString("abc")),
@@ -860,9 +867,15 @@ class CastRulesTest {
         }
 
         private CastTestSpecBuilder fromCase(DataType dataType, Object src, Object target) {
+            return fromCase(dataType, src, target, false);
+        }
+
+        private CastTestSpecBuilder fromCase(
+                DataType dataType, Object src, Object target, boolean legacyBehaviour) {
             return fromCase(
                     dataType,
                     CastRule.Context.create(
+                            legacyBehaviour,
                             DateTimeUtils.UTC_ZONE.toZoneId(),
                             Thread.currentThread().getContextClassLoader()),
                     src,
@@ -900,6 +913,7 @@ class CastRulesTest {
             return fail(
                     dataType,
                     CastRule.Context.create(
+                            false,
                             DateTimeUtils.UTC_ZONE.toZoneId(),
                             Thread.currentThread().getContextClassLoader()),
                     src,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 5204b94..b62f158 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableException, ValidationException}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
@@ -50,6 +51,7 @@ import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.logical.{RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
+
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
@@ -98,6 +100,10 @@ abstract class ExpressionTestBase {
 
   @Before
   def prepare(): Unit = {
+    config.getConfiguration.set(
+      ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+      ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
+    )
     if (containsLegacyTypes) {
       val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
       tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*)

[flink] 02/02: [hotfix][table-planner] Add class header comment to generated code

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76b47b219dab18ddb41beb29fa8664110bdb891d
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Thu Dec 2 12:25:27 2021 +0100

    [hotfix][table-planner] Add class header comment to generated code
    
    Use a class header comment to add useful configuration variables that can
    help debugging a generated class code. Added timezone and legacy behaviour
    info in this comment on the generated class implementing CAST.
---
 .../planner/codegen/CodeGeneratorContext.scala     | 25 ++++++++++++++++++++++
 .../planner/codegen/FunctionCodeGenerator.scala    |  1 +
 .../planner/codegen/calls/ScalarOperatorGens.scala |  6 ++++++
 3 files changed, 32 insertions(+)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 3be8032..6ddcbe7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -51,6 +51,10 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   // holding a list of objects that could be used passed into generated class
   val references: mutable.ArrayBuffer[AnyRef] = new mutable.ArrayBuffer[AnyRef]()
 
+  // set of strings (lines) that will be concatenated into a single class header comment
+  private val reusableHeaderComments: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
   private val reusableMemberStatements: mutable.LinkedHashSet[String] =
@@ -143,6 +147,16 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 
   def nullCheck: Boolean = tableConfig.getNullCheck
 
+
+  /**
+    * Add a line comment to [[reusableHeaderComments]] list which will be concatenated
+    * into a single class header comment.
+    * @param comment The comment to add for class header
+    */
+  def addReusableHeaderComment(comment: String): Unit = {
+    reusableHeaderComments.add(comment)
+  }
+
   // ---------------------------------------------------------------------------------
   // Local Variables for Code Split
   // ---------------------------------------------------------------------------------
@@ -197,6 +211,17 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   // ---------------------------------------------------------------------------------
 
   /**
+    * @return Comment to be added as a header comment on the generated class
+    */
+  def getClassHeaderComment(): String = {
+    s"""
+    |/*
+    | * ${reusableHeaderComments.mkString("\n * ")}
+    | */
+    """.stripMargin
+  }
+
+  /**
     * @return code block of statements that need to be placed in the member area of the class
     *         (e.g. inner class definition)
     */
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 44a4c23..24c286f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -125,6 +125,7 @@ object FunctionCodeGenerator {
 
     val funcCode =
       j"""
+      ${ctx.getClassHeaderComment()}
       public class $funcName
           extends ${samHeader._1.getCanonicalName} {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 045976c..61252f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -937,6 +937,12 @@ object ScalarOperatorGens {
       operand: GeneratedExpression,
       targetType: LogicalType)
     : GeneratedExpression = {
+
+    ctx.addReusableHeaderComment(
+      s"Using option '${ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR.key()}':" +
+        s"'${isLegacyCastBehaviourEnabled(ctx)}'")
+    ctx.addReusableHeaderComment("Timezone: " + ctx.tableConfig.getLocalTimeZone)
+
     // Try to use the new cast rules
     val rule = CastRuleProvider.resolve(operand.resultType, targetType)
     rule match {