You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/18 12:52:48 UTC

[flink] branch release-1.9 updated: [FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 0a378bb  [FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding
0a378bb is described below

commit 0a378bbd04b029ca269b3137f565d02eacdc962e
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 15 17:39:00 2019 +0200

    [FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding
    
    This closes #9455
---
 .../table/planner/codegen/calls/PrintCallGen.scala     |  5 ++++-
 .../runtime/utils/JavaUserDefinedTableFunctions.java   |  3 ++-
 .../expressions/utils/ScalarTypesTestBase.scala        |  6 ++++--
 .../table/planner/runtime/batch/sql/CalcITCase.scala   | 18 ++++++++++++------
 .../planner/utils/UserDefinedTableFunctions.scala      | 15 ---------------
 .../flink/table/dataformat/AbstractBinaryWriter.java   |  3 ++-
 .../table/dataformat/vector/VectorizedColumnBatch.java |  3 ++-
 .../table/runtime/functions/SqlFunctionUtils.java      |  2 +-
 .../flink/table/runtime/util/StringUtf8Utils.java      |  7 ++-----
 .../apache/flink/table/dataformat/BinaryRowTest.java   |  3 ++-
 .../flink/table/dataformat/BinaryStringTest.java       |  2 +-
 .../dataformat/vector/VectorizedColumnBatchTest.java   |  4 +++-
 12 files changed, 35 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
index e34b88a..a9b420e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
@@ -23,6 +23,8 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedEx
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isBinaryString
 import org.apache.flink.table.types.logical.LogicalType
 
+import java.nio.charset.StandardCharsets
+
 /**
   * Generates PRINT function call.
   */
@@ -40,8 +42,9 @@ class PrintCallGen extends CallGenerator {
     val logTerm = "logger$"
     ctx.addReusableLogger(logTerm, "_Print$_")
 
+    val charsets = classOf[StandardCharsets].getCanonicalName
     val outputCode = if (isBinaryString(returnType)) {
-      s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
+      s"new String($resultTerm, $charsets.UTF_8)"
     } else {
       s"String.valueOf(${operands(1).resultTerm})"
     }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 95e4c78..7cbe1b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.functions.TableFunction;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Random;
 
 /**
@@ -108,7 +109,7 @@ public class JavaUserDefinedTableFunctions {
 
 		public void eval(byte[] varbinary) {
 			if (varbinary != null) {
-				this.eval(new String(varbinary));
+				this.eval(new String(varbinary, StandardCharsets.UTF_8));
 			}
 		}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
index 062c3d8..b296cb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.types.Row
 
+import java.nio.charset.StandardCharsets
+
 abstract class ScalarTypesTestBase extends ExpressionTestBase {
 
   override def testData: Row = {
@@ -83,8 +85,8 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
     testData.setField(50, localDate("1997-11-11"))
     testData.setField(51, localTime("09:44:55"))
     testData.setField(52, localDateTime("1997-11-11 09:44:55.333"))
-    testData.setField(53, "hello world".getBytes)
-    testData.setField(54, "This is a testing string.".getBytes)
+    testData.setField(53, "hello world".getBytes(StandardCharsets.UTF_8))
+    testData.setField(54, "This is a testing string.".getBytes(StandardCharsets.UTF_8))
     testData
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 1bb3711..a583b8c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -45,6 +45,7 @@ import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit._
 
+import java.nio.charset.StandardCharsets
 import java.sql.{Date, Time, Timestamp}
 import java.time.{LocalDate, LocalDateTime}
 import java.util
@@ -361,7 +362,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testBinary(): Unit = {
-    val data = Seq(row(1, 2, "hehe".getBytes))
+    val data = Seq(row(1, 2, "hehe".getBytes(StandardCharsets.UTF_8)))
     registerCollection(
       "MyTable",
       data,
@@ -1170,13 +1171,15 @@ class CalcITCase extends BatchTestBase {
   def testCalcBinary(): Unit = {
     registerCollection(
       "BinaryT",
-      nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+      nullData3.map((r) => row(r.getField(0), r.getField(1),
+        r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
     checkResult(
       "select a, b, c from BinaryT where b < 1000",
-      nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes))
+      nullData3.map((r) => row(r.getField(0), r.getField(1),
+        r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
     )
   }
 
@@ -1184,7 +1187,8 @@ class CalcITCase extends BatchTestBase {
   def testOrderByBinary(): Unit = {
     registerCollection(
       "BinaryT",
-      nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+      nullData3.map((r) => row(r.getField(0), r.getField(1),
+        r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
@@ -1196,7 +1200,8 @@ class CalcITCase extends BatchTestBase {
       "select * from BinaryT order by c",
       nullData3.sortBy((x : Row) =>
         x.getField(2).asInstanceOf[String]).map((r) =>
-        row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+        row(r.getField(0), r.getField(1),
+          r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       isSorted = true
     )
   }
@@ -1205,7 +1210,8 @@ class CalcITCase extends BatchTestBase {
   def testGroupByBinary(): Unit = {
     registerCollection(
       "BinaryT2",
-      nullData3.map((r) => row(r.getField(0), r.getField(1).toString.getBytes, r.getField(2))),
+      nullData3.map((r) => row(r.getField(0),
+        r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
       new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
index 8ae75c3..eded5f4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
@@ -113,21 +113,6 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
-@SerialVersionUID(1L)
-class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
-  def eval(bytes: Array[Byte]) {
-    if (null != bytes) {
-      collect(new Tuple2(new String(bytes), bytes.length))
-    }
-  }
-
-  def eval(str: String) {
-    if (null != str) {
-      collect(new Tuple2(str, str.length))
-    }
-  }
-}
-
 //TODO support dynamic type
 //class UDTFWithDynamicType extends TableFunction[Row] {
 //
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index b70cc4e..5091c3c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.runtime.util.SegmentsUtil;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
@@ -75,7 +76,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 	public void writeString(int pos, BinaryString input) {
 		if (input.getSegments() == null) {
 			String javaObject = input.getJavaObject();
-			writeBytes(pos, javaObject.getBytes());
+			writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
 		} else {
 			int len = input.getSizeInBytes();
 			if (len <= 7) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
index e6dc2fb..122d359 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
 
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 
 /**
  * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
@@ -114,7 +115,7 @@ public class VectorizedColumnBatch implements Serializable {
 
 	public String getString(int rowId, int colId) {
 		Bytes byteArray = getByteArray(rowId, colId);
-		return new String(byteArray.data, byteArray.offset, byteArray.len);
+		return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
 	}
 
 	public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 9a993a5..a9cba63 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -486,7 +486,7 @@ public class SqlFunctionUtils {
 			}
 		}
 		if (byteArray == null) {
-			byteArray = str.getBytes();
+			byteArray = str.getBytes(StandardCharsets.UTF_8);
 		}
 		return byteArray;
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
index f67141b..5a84ec3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util;
 import org.apache.flink.core.memory.MemorySegment;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import static org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseBytes;
@@ -297,10 +298,6 @@ public class StringUtf8Utils {
 	}
 
 	public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
-		try {
-			return new String(bytes, offset, len, "UTF-8");
-		} catch (UnsupportedEncodingException e) {
-			throw new RuntimeException("encodeUTF8 error", e);
-		}
+		return new String(bytes, offset, len, StandardCharsets.UTF_8);
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index fb437c8..fa870b4 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -48,6 +48,7 @@ import org.junit.Test;
 import java.io.EOFException;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -182,7 +183,7 @@ public class BinaryRowTest {
 			BinaryRow row = new BinaryRow(2);
 			BinaryRowWriter writer = new BinaryRowWriter(row);
 			writer.writeString(0, fromString(str));
-			writer.writeString(1, fromBytes(str.getBytes()));
+			writer.writeString(1, fromBytes(str.getBytes(StandardCharsets.UTF_8)));
 			writer.complete();
 
 			assertEquals(str, row.getString(0).toString());
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index a3e2c63..015c7ff 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -724,7 +724,7 @@ public class BinaryStringTest {
 		byte[] bytes = new byte[] {(byte) 20122, (byte) 40635, 124, (byte) 38271, (byte) 34966,
 			124, (byte) 36830, (byte) 34915, (byte) 35033, 124, (byte) 55357, 124, (byte) 56407 };
 
-		String str = new String(bytes);
+		String str = new String(bytes, StandardCharsets.UTF_8);
 		assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
 		assertEquals(str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
index 049b934..53a26d1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.dataformat.vector.heap.HeapShortVector;
 
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -49,7 +51,7 @@ public class VectorizedColumnBatchTest {
 
 		HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
 		for (int i = 0; i < VECTOR_SIZE; i++) {
-			byte[] bytes = String.valueOf(i).getBytes();
+			byte[] bytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
 			col1.setVal(i, bytes, 0, bytes.length);
 		}