You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/18 12:43:09 UTC
[flink] branch master updated: [FLINK-13739][table-blink] JDK
String to bytes should specify UTF-8 encoding
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 130f4e8 [FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding
130f4e8 is described below
commit 130f4e85a0dc7498e67e591155aa0b0470b5950d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 15 17:39:00 2019 +0200
[FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding
This closes #9455
---
.../table/planner/codegen/calls/PrintCallGen.scala | 5 ++++-
.../runtime/utils/JavaUserDefinedTableFunctions.java | 3 ++-
.../expressions/utils/ScalarTypesTestBase.scala | 6 ++++--
.../table/planner/runtime/batch/sql/CalcITCase.scala | 18 ++++++++++++------
.../planner/utils/UserDefinedTableFunctions.scala | 15 ---------------
.../flink/table/dataformat/AbstractBinaryWriter.java | 3 ++-
.../table/dataformat/vector/VectorizedColumnBatch.java | 3 ++-
.../table/runtime/functions/SqlFunctionUtils.java | 2 +-
.../flink/table/runtime/util/StringUtf8Utils.java | 7 ++-----
.../apache/flink/table/dataformat/BinaryRowTest.java | 3 ++-
.../flink/table/dataformat/BinaryStringTest.java | 2 +-
.../dataformat/vector/VectorizedColumnBatchTest.java | 4 +++-
12 files changed, 35 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
index e34b88a..a9b420e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/PrintCallGen.scala
@@ -23,6 +23,8 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedEx
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isBinaryString
import org.apache.flink.table.types.logical.LogicalType
+import java.nio.charset.StandardCharsets
+
/**
* Generates PRINT function call.
*/
@@ -40,8 +42,9 @@ class PrintCallGen extends CallGenerator {
val logTerm = "logger$"
ctx.addReusableLogger(logTerm, "_Print$_")
+ val charsets = classOf[StandardCharsets].getCanonicalName
val outputCode = if (isBinaryString(returnType)) {
- s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
+ s"new String($resultTerm, $charsets.UTF_8)"
} else {
s"String.valueOf(${operands(1).resultTerm})"
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 95e4c78..7cbe1b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.Random;
/**
@@ -108,7 +109,7 @@ public class JavaUserDefinedTableFunctions {
public void eval(byte[] varbinary) {
if (varbinary != null) {
- this.eval(new String(varbinary));
+ this.eval(new String(varbinary, StandardCharsets.UTF_8));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
index 062c3d8..b296cb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarTypesTestBase.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row
+import java.nio.charset.StandardCharsets
+
abstract class ScalarTypesTestBase extends ExpressionTestBase {
override def testData: Row = {
@@ -83,8 +85,8 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(50, localDate("1997-11-11"))
testData.setField(51, localTime("09:44:55"))
testData.setField(52, localDateTime("1997-11-11 09:44:55.333"))
- testData.setField(53, "hello world".getBytes)
- testData.setField(54, "This is a testing string.".getBytes)
+ testData.setField(53, "hello world".getBytes(StandardCharsets.UTF_8))
+ testData.setField(54, "This is a testing string.".getBytes(StandardCharsets.UTF_8))
testData
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 63499a4..3bb462d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -45,6 +45,7 @@ import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit._
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util
@@ -361,7 +362,7 @@ class CalcITCase extends BatchTestBase {
@Test
def testBinary(): Unit = {
- val data = Seq(row(1, 2, "hehe".getBytes))
+ val data = Seq(row(1, 2, "hehe".getBytes(StandardCharsets.UTF_8)))
registerCollection(
"MyTable",
data,
@@ -1170,13 +1171,15 @@ class CalcITCase extends BatchTestBase {
def testCalcBinary(): Unit = {
registerCollection(
"BinaryT",
- nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+ nullData3.map((r) => row(r.getField(0), r.getField(1),
+ r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
checkResult(
"select a, b, c from BinaryT where b < 1000",
- nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes))
+ nullData3.map((r) => row(r.getField(0), r.getField(1),
+ r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
)
}
@@ -1184,7 +1187,8 @@ class CalcITCase extends BatchTestBase {
def testOrderByBinary(): Unit = {
registerCollection(
"BinaryT",
- nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+ nullData3.map((r) => row(r.getField(0), r.getField(1),
+ r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
@@ -1196,7 +1200,8 @@ class CalcITCase extends BatchTestBase {
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
x.getField(2).asInstanceOf[String]).map((r) =>
- row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
+ row(r.getField(0), r.getField(1),
+ r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
isSorted = true
)
}
@@ -1205,7 +1210,8 @@ class CalcITCase extends BatchTestBase {
def testGroupByBinary(): Unit = {
registerCollection(
"BinaryT2",
- nullData3.map((r) => row(r.getField(0), r.getField(1).toString.getBytes, r.getField(2))),
+ nullData3.map((r) => row(r.getField(0),
+ r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
index 8ae75c3..eded5f4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
@@ -113,21 +113,6 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
}
-@SerialVersionUID(1L)
-class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
- def eval(bytes: Array[Byte]) {
- if (null != bytes) {
- collect(new Tuple2(new String(bytes), bytes.length))
- }
- }
-
- def eval(str: String) {
- if (null != str) {
- collect(new Tuple2(str, str.length))
- }
- }
-}
-
//TODO support dynamic type
//class UDTFWithDynamicType extends TableFunction[Row] {
//
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index b70cc4e..5091c3c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.runtime.util.SegmentsUtil;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
@@ -75,7 +76,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
public void writeString(int pos, BinaryString input) {
if (input.getSegments() == null) {
String javaObject = input.getJavaObject();
- writeBytes(pos, javaObject.getBytes());
+ writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
} else {
int len = input.getSizeInBytes();
if (len <= 7) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
index e6dc2fb..122d359 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
/**
* A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
@@ -114,7 +115,7 @@ public class VectorizedColumnBatch implements Serializable {
public String getString(int rowId, int colId) {
Bytes byteArray = getByteArray(rowId, colId);
- return new String(byteArray.data, byteArray.offset, byteArray.len);
+ return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
}
public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 9a993a5..a9cba63 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -486,7 +486,7 @@ public class SqlFunctionUtils {
}
}
if (byteArray == null) {
- byteArray = str.getBytes();
+ byteArray = str.getBytes(StandardCharsets.UTF_8);
}
return byteArray;
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
index f67141b..5a84ec3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StringUtf8Utils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util;
import org.apache.flink.core.memory.MemorySegment;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseBytes;
@@ -297,10 +298,6 @@ public class StringUtf8Utils {
}
public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
- try {
- return new String(bytes, offset, len, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("encodeUTF8 error", e);
- }
+ return new String(bytes, offset, len, StandardCharsets.UTF_8);
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index fb437c8..fa870b4 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -48,6 +48,7 @@ import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@@ -182,7 +183,7 @@ public class BinaryRowTest {
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeString(0, fromString(str));
- writer.writeString(1, fromBytes(str.getBytes()));
+ writer.writeString(1, fromBytes(str.getBytes(StandardCharsets.UTF_8)));
writer.complete();
assertEquals(str, row.getString(0).toString());
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index a3e2c63..015c7ff 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -724,7 +724,7 @@ public class BinaryStringTest {
byte[] bytes = new byte[] {(byte) 20122, (byte) 40635, 124, (byte) 38271, (byte) 34966,
124, (byte) 36830, (byte) 34915, (byte) 35033, 124, (byte) 55357, 124, (byte) 56407 };
- String str = new String(bytes);
+ String str = new String(bytes, StandardCharsets.UTF_8);
assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
assertEquals(str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
index 049b934..53a26d1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.dataformat.vector.heap.HeapShortVector;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -49,7 +51,7 @@ public class VectorizedColumnBatchTest {
HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
for (int i = 0; i < VECTOR_SIZE; i++) {
- byte[] bytes = String.valueOf(i).getBytes();
+ byte[] bytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
col1.setVal(i, bytes, 0, bytes.length);
}