You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/25 07:05:44 UTC
flink git commit: [FLINK-9384] [table] Fix KafkaAvroTableSource type
mismatch
Repository: flink
Updated Branches:
refs/heads/master cb48019ff -> bc8d1b1f6
[FLINK-9384] [table] Fix KafkaAvroTableSource type mismatch
This closes #6026.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc8d1b1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc8d1b1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc8d1b1f
Branch: refs/heads/master
Commit: bc8d1b1f61deff51f131ab243f25f676f465e240
Parents: cb48019
Author: jerryjzhang <zh...@163.com>
Authored: Thu May 24 21:24:49 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Fri May 25 09:02:08 2018 +0200
----------------------------------------------------------------------
.../kafka/KafkaAvroTableSourceTestBase.java | 9 +++++++++
.../formats/avro/AvroRowDeserializationSchema.java | 16 +++++++++++++++-
.../avro/AvroRowDeSerializationSchemaTest.java | 15 ++++++++-------
.../flink/formats/avro/utils/AvroTestUtils.java | 9 +++++----
4 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index 557a20f..16beb7d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Types;
import org.apache.avro.Schema;
@@ -72,6 +73,10 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
// check field mapping
assertNull(source.getFieldMapping());
+
+ // check if DataStream type matches with TableSource.getReturnType()
+ assertEquals(source.getReturnType(),
+ source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
@Test
@@ -117,6 +122,10 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
assertEquals("otherField1", fieldMapping.get("field1"));
assertEquals("otherField2", fieldMapping.get("field2"));
assertEquals("otherField3", fieldMapping.get("field3"));
+
+ // check if DataStream type matches with TableSource.getReturnType()
+ assertEquals(source.getReturnType(),
+ source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index a8422a4..276257a 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -18,6 +18,8 @@
package org.apache.flink.formats.avro;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -30,6 +32,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import java.io.IOException;
@@ -77,11 +80,16 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
private SpecificRecord record;
/**
+ * Type information describing the result type.
+ */
+ private transient TypeInformation<Row> typeInfo;
+
+ /**
* Creates a Avro deserialization schema for the given record.
*
* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
*/
- public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
+ public AvroRowDeserializationSchema(Class<? extends SpecificRecordBase> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
this.schema = SpecificData.get().getSchema(recordClazz);
@@ -89,6 +97,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ this.typeInfo = AvroRecordClassConverter.convert(recordClazz);
}
@Override
@@ -120,6 +129,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return typeInfo;
+ }
+
/**
* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
* Avro's {@link Utf8} fields are converted into regular Java strings.
http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 5341bcf..1d98c14 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;
import java.io.IOException;
@@ -37,7 +38,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testSerializeDeserializeSimpleRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -50,7 +51,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testSerializeSimpleRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -65,7 +66,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testDeserializeRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -80,7 +81,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testSerializeDeserializeComplexRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -93,7 +94,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testSerializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -108,7 +109,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testDeserializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -123,7 +124,7 @@ public class AvroRowDeSerializationSchemaTest {
@Test
public void testSerializability() throws IOException, ClassNotFoundException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index e6a2021..ce23ccc 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -70,7 +71,7 @@ public final class AvroTestUtils {
/**
* Tests a simple Avro data types without nesting.
*/
- public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() {
+ public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getSimpleTestData() {
final Address addr = Address.newBuilder()
.setNum(42)
.setStreet("Main Street 42")
@@ -86,7 +87,7 @@ public final class AvroTestUtils {
rowAddr.setField(3, "Test State");
rowAddr.setField(4, "12345");
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
t.f0 = Address.class;
t.f1 = addr;
t.f2 = rowAddr;
@@ -97,7 +98,7 @@ public final class AvroTestUtils {
/**
* Tests all Avro data types as well as nested types.
*/
- public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() {
+ public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getComplexTestData() {
final Address addr = Address.newBuilder()
.setNum(42)
.setStreet("Main Street 42")
@@ -148,7 +149,7 @@ public final class AvroTestUtils {
rowUser.setField(13, null);
rowUser.setField(14, rowAddr);
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+ final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
t.f0 = User.class;
t.f1 = user;
t.f2 = rowUser;