You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:29 UTC
[flink] 05/07: [hotfix] Migrate AvroTypesITCase to blink planner
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb0201d88f2645a8c0317517f3e1085892dd432f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 19:28:33 2020 +0200
[hotfix] Migrate AvroTypesITCase to blink planner
---
.../avro/utils/AvroKryoSerializerUtils.java | 2 +-
.../flink/table/runtime/batch/AvroTypesITCase.java | 78 ++++++++++++++--------
2 files changed, 51 insertions(+), 29 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index b871dbc..ba125c2 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -126,7 +126,7 @@ public class AvroKryoSerializerUtils extends AvroUtils {
output.writeInt(localDate.getDayOfMonth());
final Chronology chronology = localDate.getChronology();
- if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+ if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
throw new RuntimeException("Unsupported chronology: " + chronology);
}
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 7dfe9e3..4d93f6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -18,20 +18,22 @@
package org.apache.flink.table.runtime.batch;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
import org.apache.avro.util.Utf8;
import org.joda.time.DateTime;
@@ -48,6 +50,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
import static org.junit.Assert.assertEquals;
@@ -147,15 +150,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
@Test
public void testAvroToRow() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
- Table t = tEnv.fromDataSet(testData(env));
+ Table t = tEnv.fromDataStream(testData(env));
Table result = t.select($("*"));
- List<Row> results = tEnv.toDataSet(result, Row.class).collect();
+ List<Row> results = CollectionUtil.iteratorToList(
+ DataStreamUtils.collect(
+ tEnv.toAppendStream(
+ result,
+ Row.class)));
String expected =
"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
@@ -163,8 +170,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
- "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " +
- "\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," +
+ // TODO we should get an Avro record here instead of a nested row. This should be fixed
+ // with FLIP-136
+ "Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
"2014-03-01T12:12:12.321Z,null\n" +
"yellow,null,Terminator,[false],[world],false," +
"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
@@ -176,12 +184,16 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
@Test
public void testAvroStringAccess() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
- Table t = tEnv.fromDataSet(testData(env));
+ Table t = tEnv.fromDataStream(testData(env));
Table result = t.select($("name"));
- List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
+ List<Utf8> results = CollectionUtil.iteratorToList(result.execute().collect())
+ .stream()
+ .map(row -> (Utf8) row.getField(0))
+ .collect(Collectors.toList());
+
String expected = "Charlie\n" +
"Terminator\n" +
"Whatever";
@@ -190,38 +202,48 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
@Test
public void testAvroObjectAccess() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+ env,
+ EnvironmentSettings.newInstance().useBlinkPlanner().build());
- Table t = tEnv.fromDataSet(testData(env));
+ Table t = tEnv.fromDataStream(testData(env));
Table result = t
.filter($("type_nested").isNotNull())
.select($("type_nested").flatten())
.as("city", "num", "state", "street", "zip");
- List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
+ List<Address> results = CollectionUtil.iteratorToList(
+ DataStreamUtils.collect(
+ tEnv.toAppendStream(
+ result,
+ Address.class)));
String expected = USER_1.getTypeNested().toString();
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
public void testAvroToAvro() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+ env,
+ EnvironmentSettings.newInstance().useBlinkPlanner().build());
- Table t = tEnv.fromDataSet(testData(env));
+ Table t = tEnv.fromDataStream(testData(env));
Table result = t.select($("*"));
- List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
+ List<User> results = CollectionUtil.iteratorToList(
+ DataStreamUtils.collect(
+ tEnv.toAppendStream(result, User.class)));
List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
assertEquals(expected, results);
}
- private DataSet<User> testData(ExecutionEnvironment env) {
- List<User> data = new ArrayList<>(3);
- data.add(USER_1);
- data.add(USER_2);
- data.add(USER_3);
- return env.fromCollection(data);
+ private DataStream<User> testData(StreamExecutionEnvironment env) {
+ return env.fromElements(
+ USER_1,
+ USER_2,
+ USER_3
+ );
}
}