You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/25 07:07:17 UTC

[flink] branch release-1.11 updated: [FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in converters

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f5ac8c3  [FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in converters
f5ac8c3 is described below

commit f5ac8c352b7fb5aff5a78cffa348f72bd8492509
Author: zoudan <zo...@bytedance.com>
AuthorDate: Tue Jun 9 16:12:11 2020 +0800

    [FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in converters
    
    This closes #12542.
---
 .../data/conversion/ArrayObjectArrayConverter.java    |  2 +-
 .../flink/table/data/util/DataFormatConverters.java   |  2 +-
 .../flink/table/data/DataFormatConvertersTest.java    | 12 +++++++++++-
 .../flink/table/data/DataStructureConvertersTest.java | 19 +++++++++++++++++++
 4 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index 5049064..c191c55 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -115,7 +115,7 @@ public class ArrayObjectArrayConverter<E> implements DataStructureConverter<Arra
 		for (int pos = 0; pos < length; pos++) {
 			writeElement(pos, external[pos]);
 		}
-		return completeWriter();
+		return completeWriter().copy();
 	}
 
 	private E[] toJavaArray(ArrayData internal) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 347a517..acef60d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -1138,7 +1138,7 @@ public class DataFormatConverters {
 				}
 			}
 			reuseWriter.complete();
-			return reuseArray;
+			return reuseArray.copy();
 		}
 
 		@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index 82e33d1..ac53aa2 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -148,9 +148,18 @@ public class DataFormatConvertersTest {
 	}
 
 	private static void test(TypeInformation typeInfo, Object value) {
+		test(typeInfo, value, null);
+	}
+
+	private static void test(TypeInformation typeInfo, Object value, Object anotherValue) {
 		DataFormatConverter converter = getConverter(typeInfo);
+		final Object innerValue = converter.toInternal(value);
+		if (anotherValue != null) {
+			converter.toInternal(anotherValue);
+		}
+
 		Assert.assertTrue(Arrays.deepEquals(
-				new Object[] {converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
+			new Object[] {converter.toExternal(innerValue)}, new Object[]{value}));
 	}
 
 	private static DataFormatConverter getConverter(DataType dataType) {
@@ -193,6 +202,7 @@ public class DataFormatConvertersTest {
 		test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] {null, null});
 		test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {null, null});
 		test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"});
+		test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"}, new String[] {"aa", "bb"});
 		test(new MapTypeInfo<>(Types.STRING, Types.INT), null);
 
 		HashMap<String, Integer> map = new HashMap<>();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 610d53c..29710fa 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -345,6 +345,12 @@ public class DataStructureConvertersTest {
 							null
 						)
 					})
+				.convertedToWithAnotherValue(
+					Row[].class,
+					new Row[] {
+						Row.of(null, null),
+						Row.of(new PojoWithImmutableFields(10, "Bob"), null)
+					})
 		);
 	}
 
@@ -369,6 +375,11 @@ public class DataStructureConvertersTest {
 
 			final Object internalValue = fromConverter.toInternalOrNull(from.getValue());
 
+			final Object anotherValue = testSpec.conversionsWithAnotherValue.get(from.getKey());
+			if (anotherValue != null) {
+				fromConverter.toInternalOrNull(anotherValue);
+			}
+
 			for (Map.Entry<Class<?>, Object> to : testSpec.conversions.entrySet()) {
 				final DataType toDataType = testSpec.dataType.bridgedTo(to.getKey());
 
@@ -395,12 +406,15 @@ public class DataStructureConvertersTest {
 
 		private final Map<Class<?>, Object> conversions;
 
+		private final Map<Class<?>, Object> conversionsWithAnotherValue;
+
 		private @Nullable String expectedErrorMessage;
 
 		private TestSpec(String description, DataType dataType) {
 			this.description = description;
 			this.dataType = dataType;
 			this.conversions = new LinkedHashMap<>();
+			this.conversionsWithAnotherValue = new LinkedHashMap<>();
 		}
 
 		static TestSpec forDataType(AbstractDataType<?> dataType) {
@@ -420,6 +434,11 @@ public class DataStructureConvertersTest {
 			return this;
 		}
 
+		<T> TestSpec convertedToWithAnotherValue(Class<T> clazz, T value) {
+			conversionsWithAnotherValue.put(clazz, value);
+			return this;
+		}
+
 		<T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> supplier) {
 			conversions.put(clazz, supplier.get());
 			return this;