You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:29 UTC

[flink] 05/07: [hotfix] Migrate AvroTypesITCase to blink planner

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb0201d88f2645a8c0317517f3e1085892dd432f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 19:28:33 2020 +0200

    [hotfix] Migrate AvroTypesITCase to blink planner
---
 .../avro/utils/AvroKryoSerializerUtils.java        |  2 +-
 .../flink/table/runtime/batch/AvroTypesITCase.java | 78 ++++++++++++++--------
 2 files changed, 51 insertions(+), 29 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index b871dbc..ba125c2 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -126,7 +126,7 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			output.writeInt(localDate.getDayOfMonth());
 
 			final Chronology chronology = localDate.getChronology();
-			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+			if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
 				throw new RuntimeException("Unsupported chronology: " + chronology);
 			}
 		}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 7dfe9e3..4d93f6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.table.runtime.batch;
 
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
 import org.joda.time.DateTime;
@@ -48,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.junit.Assert.assertEquals;
@@ -147,15 +150,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroToRow() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
 		env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
 
-		List<Row> results = tEnv.toDataSet(result, Row.class).collect();
+		List<Row> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(
+					result,
+					Row.class)));
 		String expected =
 			"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
 			"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
@@ -163,8 +170,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
 			"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
-			"{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " +
-			"\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," +
+			// TODO we should get an Avro record here instead of a nested row. This should be fixed
+			// with FLIP-136
+			"Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
 			"2014-03-01T12:12:12.321Z,null\n" +
 			"yellow,null,Terminator,[false],[world],false," +
 			"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
@@ -176,12 +184,16 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroStringAccess() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("name"));
-		List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
+		List<Utf8> results = CollectionUtil.iteratorToList(result.execute().collect())
+			.stream()
+			.map(row -> (Utf8) row.getField(0))
+			.collect(Collectors.toList());
+
 		String expected = "Charlie\n" +
 				"Terminator\n" +
 				"Whatever";
@@ -190,38 +202,48 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroObjectAccess() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t
 				.filter($("type_nested").isNotNull())
 				.select($("type_nested").flatten())
 				.as("city", "num", "state", "street", "zip");
 
-		List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
+		List<Address> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(
+					result,
+					Address.class)));
 		String expected = USER_1.getTypeNested().toString();
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
 	@Test
 	public void testAvroToAvro() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
 
-		List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
+		List<User> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(result, User.class)));
 		List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
 		assertEquals(expected, results);
 	}
 
-	private DataSet<User> testData(ExecutionEnvironment env) {
-		List<User> data = new ArrayList<>(3);
-		data.add(USER_1);
-		data.add(USER_2);
-		data.add(USER_3);
-		return env.fromCollection(data);
+	private DataStream<User> testData(StreamExecutionEnvironment env) {
+		return env.fromElements(
+			USER_1,
+			USER_2,
+			USER_3
+		);
 	}
 }