You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/03 08:52:42 UTC

[GitHub] [flink] twalthr commented on a change in pull request #18221: [FLINK-24803][table-planner] Fix cast BINARY/VARBINARY to STRING

twalthr commented on a change in pull request #18221:
URL: https://github.com/apache/flink/pull/18221#discussion_r777348593



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
##########
@@ -20,35 +20,114 @@
 
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
-import java.nio.charset.StandardCharsets;
-
-import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayElement;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayLength;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.stringExceedsLength;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
  */
-class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> {
+class BinaryToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<byte[], String> {
 
     static final BinaryToStringCastRule INSTANCE = new BinaryToStringCastRule();
 
     private BinaryToStringCastRule() {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.BINARY_STRING)
-                        .target(STRING_TYPE)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
                         .build());
     }
 
+    /* Example generated code
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.setLength(0);
+        builder$1.append("[");
+        for (int i$2 = 0; i$2 < _myInput.length; i$2++) {
+            if (i$2 != 0) {
+                builder$1.append(", ");
+            }
+            builder$1.append(_myInput[i$2]);
+        }
+        builder$1.append("]");
+        java.lang.String resultString$152;
+        resultString$152 = builder$153.toString();
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$152);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+     */
+
     @Override
-    public String generateStringExpression(
+    protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
             String inputTerm,
+            String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return constructorCall(
-                String.class, inputTerm, accessStaticField(StandardCharsets.class, "UTF_8"));
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        final String resultStringTerm = newName("resultString");
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, constructorCall(StringBuilder.class));
+
+        CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(methodCall(builderTerm, "setLength", 0))
+                        .stmt(methodCall(builderTerm, "append", strLiteral("[")))
+                        .forStmt(
+                                arrayLength(inputTerm),
+                                (indexTerm, loopBodyWriter) -> {
+                                    if (!context.legacyBehaviour() && couldTrim(length)) {

Review comment:
       I guess we don't need to check for legacy behavior if we are changing the representation anyway.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
##########
@@ -20,35 +20,114 @@
 
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
-import java.nio.charset.StandardCharsets;
-
-import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayElement;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayLength;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.stringExceedsLength;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
  */
-class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> {
+class BinaryToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<byte[], String> {
 
     static final BinaryToStringCastRule INSTANCE = new BinaryToStringCastRule();
 
     private BinaryToStringCastRule() {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.BINARY_STRING)
-                        .target(STRING_TYPE)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
                         .build());
     }
 
+    /* Example generated code
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.setLength(0);
+        builder$1.append("[");
+        for (int i$2 = 0; i$2 < _myInput.length; i$2++) {
+            if (i$2 != 0) {
+                builder$1.append(", ");
+            }
+            builder$1.append(_myInput[i$2]);
+        }
+        builder$1.append("]");
+        java.lang.String resultString$152;
+        resultString$152 = builder$153.toString();
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$152);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+     */
+
     @Override
-    public String generateStringExpression(
+    protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
             String inputTerm,
+            String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return constructorCall(
-                String.class, inputTerm, accessStaticField(StandardCharsets.class, "UTF_8"));
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        final String resultStringTerm = newName("resultString");
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, constructorCall(StringBuilder.class));
+
+        CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(methodCall(builderTerm, "setLength", 0))
+                        .stmt(methodCall(builderTerm, "append", strLiteral("[")))
+                        .forStmt(
+                                arrayLength(inputTerm),
+                                (indexTerm, loopBodyWriter) -> {
+                                    if (!context.legacyBehaviour() && couldTrim(length)) {
+                                        // Break if the target length is already exceeded
+                                        loopBodyWriter.ifStmt(
+                                                stringExceedsLength(builderTerm, length),
+                                                thenBodyWriter -> thenBodyWriter.stmt("break"));

Review comment:
       `breakStmt`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org