You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/31 11:56:37 UTC

[flink] 01/02: [hotfix][table-planner] Assume that length of source type is respected for CAST

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1fa98436adf0bc2f88021c63e3bf1c166deb1ae7
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Dec 21 13:54:25 2021 +0200

    [hotfix][table-planner] Assume that length of source type is respected for CAST
    
    When casting to CHAR/VARCHAR/BINARY/VARBINARY, we assume that
    the length of the source type CHAR/VARCHAR/BINARY/VARBINARY is
    respected, to avoid performance overhead by applying checks and trimming
    at runtime. i.e. if input type is `VARCHAR(3)`, input value is 'foobar' and target
    type is `VARCHAR(4)`, no trimming is applied and the result value remains:
    `foobar`.
---
 .../functions/casting/BinaryToBinaryCastRule.java  |  1 +
 .../casting/CharVarCharTrimPadCastRule.java        | 23 ++++++++++++-----
 .../planner/functions/casting/CastRulesTest.java   | 30 ++++++++++++++++++++--
 3 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToBinaryCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToBinaryCastRule.java
index bd21f4e..9887818 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToBinaryCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToBinaryCastRule.java
@@ -63,6 +63,7 @@ class BinaryToBinaryCastRule extends AbstractExpressionCodeGeneratorCastRule<byt
         if (context.legacyBehaviour()) {
             return inputTerm;
         } else {
+            // Assume input length is respected by the source
             if (inputLength <= targetLength) {
                 return inputTerm;
             } else {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java
index 9d87074..df26517 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java
@@ -88,7 +88,11 @@ class CharVarCharTrimPadCastRule
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        final int targetLength = LogicalTypeChecks.getLength(targetLogicalType);
+        Integer inputLength = null;
+        if (inputLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+            inputLength = LogicalTypeChecks.getLength(inputLogicalType);
+        }
         CastRule<?, ?> castRule =
                 CastRuleProvider.resolve(inputLogicalType, VarCharType.STRING_TYPE);
 
@@ -103,27 +107,32 @@ class CharVarCharTrimPadCastRule
 
             final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
             if (context.legacyBehaviour()
-                    || !(couldTrim(length) || couldPad(targetLogicalType, length))) {
+                    || ((!couldTrim(targetLength)
+                                    // Assume input length is respected by the source
+                                    || (inputLength != null && inputLength <= targetLength))
+                            && !couldPad(targetLogicalType, targetLength))) {
                 return writer.assignStmt(returnVariable, stringExpr).toString();
             }
             return writer.ifStmt(
-                            methodCall(stringExpr, "numChars") + " > " + length,
+                            methodCall(stringExpr, "numChars") + " > " + targetLength,
                             thenWriter ->
                                     thenWriter.assignStmt(
                                             returnVariable,
-                                            methodCall(stringExpr, "substring", 0, length)),
+                                            methodCall(stringExpr, "substring", 0, targetLength)),
                             elseWriter -> {
-                                if (couldPad(targetLogicalType, length)) {
+                                if (couldPad(targetLogicalType, targetLength)) {
                                     final String padLength = newName("padLength");
                                     final String padString = newName("padString");
                                     elseWriter.ifStmt(
-                                            methodCall(stringExpr, "numChars") + " < " + length,
+                                            methodCall(stringExpr, "numChars")
+                                                    + " < "
+                                                    + targetLength,
                                             thenInnerWriter ->
                                                     thenInnerWriter
                                                             .declStmt(int.class, padLength)
                                                             .assignStmt(
                                                                     padLength,
-                                                                    length
+                                                                    targetLength
                                                                             + " - "
                                                                             + methodCall(
                                                                                     stringExpr,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index fbde1d6..5e7dfdf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -771,6 +771,13 @@ class CastRulesTest {
                         .fromCaseLegacy(CHAR(6), fromString("Apache"), fromString("Apache"))
                         .fromCase(VARCHAR(5), fromString("Flink"), fromString("Fli"))
                         .fromCaseLegacy(VARCHAR(5), fromString("Flink"), fromString("Flink"))
+                        // We assume that the input length is respected, therefore, no trimming is
+                        // applied
+                        .fromCase(CHAR(2), fromString("Apache"), fromString("Apache"))
+                        .fromCaseLegacy(CHAR(2), fromString("Apache"), fromString("Apache"))
+                        .fromCase(VARCHAR(2), fromString("Apache"), fromString("Apache"))
+                        .fromCaseLegacy(VARCHAR(2), fromString("Apache"), fromString("Apache"))
+                        //
                         .fromCase(STRING(), fromString("Apache Flink"), fromString("Apa"))
                         .fromCaseLegacy(
                                 STRING(), fromString("Apache Flink"), fromString("Apache Flink"))
@@ -962,7 +969,13 @@ class CastRulesTest {
                         .fromCaseLegacy(
                                 STRING(),
                                 fromString("Apache"),
-                                new byte[] {65, 112, 97, 99, 104, 101}),
+                                new byte[] {65, 112, 97, 99, 104, 101})
+                        // We assume that the input length is respected, therefore, no trimming is
+                        // applied
+                        .fromCase(BINARY(2), new byte[] {1, 2, 3}, new byte[] {1, 2, 3})
+                        .fromCaseLegacy(BINARY(2), new byte[] {1, 2, 3}, new byte[] {1, 2, 3})
+                        .fromCase(VARBINARY(2), new byte[] {1, 2, 3}, new byte[] {1, 2, 3})
+                        .fromCaseLegacy(VARBINARY(2), new byte[] {1, 2, 3}, new byte[] {1, 2, 3}),
                 CastTestSpecBuilder.testCastTo(VARBINARY(4))
                         .fromCase(CHAR(3), fromString("foo"), new byte[] {102, 111, 111})
                         .fromCaseLegacy(
@@ -974,7 +987,20 @@ class CastRulesTest {
                         .fromCaseLegacy(
                                 STRING(),
                                 fromString("Apache"),
-                                new byte[] {65, 112, 97, 99, 104, 101}),
+                                new byte[] {65, 112, 97, 99, 104, 101})
+                        // We assume that the input length is respected, therefore, no trimming is
+                        // applied
+                        .fromCase(BINARY(2), new byte[] {1, 2, 3, 4, 5}, new byte[] {1, 2, 3, 4, 5})
+                        .fromCaseLegacy(
+                                BINARY(2), new byte[] {1, 2, 3, 4, 5}, new byte[] {1, 2, 3, 4, 5})
+                        .fromCase(
+                                VARBINARY(2),
+                                new byte[] {1, 2, 3, 4, 5},
+                                new byte[] {1, 2, 3, 4, 5})
+                        .fromCaseLegacy(
+                                VARBINARY(2),
+                                new byte[] {1, 2, 3, 4, 5},
+                                new byte[] {1, 2, 3, 4, 5}),
                 CastTestSpecBuilder.testCastTo(BYTES())
                         .fromCase(CHAR(3), fromString("foo"), new byte[] {102, 111, 111})
                         .fromCase(