You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/25 07:07:17 UTC
[flink] branch release-1.11 updated:
[FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in
converters
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f5ac8c3 [FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in converters
f5ac8c3 is described below
commit f5ac8c352b7fb5aff5a78cffa348f72bd8492509
Author: zoudan <zo...@bytedance.com>
AuthorDate: Tue Jun 9 16:12:11 2020 +0800
[FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in converters
This closes #12542.
---
.../data/conversion/ArrayObjectArrayConverter.java | 2 +-
.../flink/table/data/util/DataFormatConverters.java | 2 +-
.../flink/table/data/DataFormatConvertersTest.java | 12 +++++++++++-
.../flink/table/data/DataStructureConvertersTest.java | 19 +++++++++++++++++++
4 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index 5049064..c191c55 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -115,7 +115,7 @@ public class ArrayObjectArrayConverter<E> implements DataStructureConverter<Arra
for (int pos = 0; pos < length; pos++) {
writeElement(pos, external[pos]);
}
- return completeWriter();
+ return completeWriter().copy();
}
private E[] toJavaArray(ArrayData internal) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 347a517..acef60d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -1138,7 +1138,7 @@ public class DataFormatConverters {
}
}
reuseWriter.complete();
- return reuseArray;
+ return reuseArray.copy();
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index 82e33d1..ac53aa2 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -148,9 +148,18 @@ public class DataFormatConvertersTest {
}
private static void test(TypeInformation typeInfo, Object value) {
+ test(typeInfo, value, null);
+ }
+
+ private static void test(TypeInformation typeInfo, Object value, Object anotherValue) {
DataFormatConverter converter = getConverter(typeInfo);
+ final Object innerValue = converter.toInternal(value);
+ if (anotherValue != null) {
+ converter.toInternal(anotherValue);
+ }
+
Assert.assertTrue(Arrays.deepEquals(
- new Object[] {converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
+ new Object[] {converter.toExternal(innerValue)}, new Object[]{value}));
}
private static DataFormatConverter getConverter(DataType dataType) {
@@ -193,6 +202,7 @@ public class DataFormatConvertersTest {
test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] {null, null});
test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {null, null});
test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"});
+ test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"}, new String[] {"aa", "bb"});
test(new MapTypeInfo<>(Types.STRING, Types.INT), null);
HashMap<String, Integer> map = new HashMap<>();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 610d53c..29710fa 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -345,6 +345,12 @@ public class DataStructureConvertersTest {
null
)
})
+ .convertedToWithAnotherValue(
+ Row[].class,
+ new Row[] {
+ Row.of(null, null),
+ Row.of(new PojoWithImmutableFields(10, "Bob"), null)
+ })
);
}
@@ -369,6 +375,11 @@ public class DataStructureConvertersTest {
final Object internalValue = fromConverter.toInternalOrNull(from.getValue());
+ final Object anotherValue = testSpec.conversionsWithAnotherValue.get(from.getKey());
+ if (anotherValue != null) {
+ fromConverter.toInternalOrNull(anotherValue);
+ }
+
for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());
@@ -395,12 +406,15 @@ public class DataStructureConvertersTest {
private final Map<Class<?>, Object> conversions;
+ private final Map<Class<?>, Object> conversionsWithAnotherValue;
+
private @Nullable String expectedErrorMessage;
private TestSpec(String description, DataType dataType) {
this.description = description;
this.dataType = dataType;
this.conversions = new LinkedHashMap<>();
+ this.conversionsWithAnotherValue = new LinkedHashMap<>();
}
static TestSpec forDataType(AbstractDataType<?> dataType) {
@@ -420,6 +434,11 @@ public class DataStructureConvertersTest {
return this;
}
+ <T> TestSpec convertedToWithAnotherValue(Class<T> clazz, T value) {
+ conversionsWithAnotherValue.put(clazz, value);
+ return this;
+ }
+
<T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
conversions.put(clazz, supplier.get());
return this;