You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/25 07:05:44 UTC

flink git commit: [FLINK-9384] [table] Fix KafkaAvroTableSource type mismatch

Repository: flink
Updated Branches:
  refs/heads/master cb48019ff -> bc8d1b1f6


[FLINK-9384] [table] Fix KafkaAvroTableSource type mismatch

This closes #6026.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc8d1b1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc8d1b1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc8d1b1f

Branch: refs/heads/master
Commit: bc8d1b1f61deff51f131ab243f25f676f465e240
Parents: cb48019
Author: jerryjzhang <zh...@163.com>
Authored: Thu May 24 21:24:49 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Fri May 25 09:02:08 2018 +0200

----------------------------------------------------------------------
 .../kafka/KafkaAvroTableSourceTestBase.java         |  9 +++++++++
 .../formats/avro/AvroRowDeserializationSchema.java  | 16 +++++++++++++++-
 .../avro/AvroRowDeSerializationSchemaTest.java      | 15 ++++++++-------
 .../flink/formats/avro/utils/AvroTestUtils.java     |  9 +++++----
 4 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index 557a20f..16beb7d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Types;
 
 import org.apache.avro.Schema;
@@ -72,6 +73,10 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 
 		// check field mapping
 		assertNull(source.getFieldMapping());
+
+		// check if DataStream type matches with TableSource.getReturnType()
+		assertEquals(source.getReturnType(),
+			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
 	}
 
 	@Test
@@ -117,6 +122,10 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		assertEquals("otherField1", fieldMapping.get("field1"));
 		assertEquals("otherField2", fieldMapping.get("field2"));
 		assertEquals("otherField3", fieldMapping.get("field3"));
+
+		// check if DataStream type matches with TableSource.getReturnType()
+		assertEquals(source.getReturnType(),
+			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index a8422a4..276257a 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -18,6 +18,8 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
@@ -30,6 +32,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.util.Utf8;
 
 import java.io.IOException;
@@ -77,11 +80,16 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	private SpecificRecord record;
 
 	/**
+	 * Type information describing the result type.
+	 */
+	private transient TypeInformation<Row> typeInfo;
+
+	/**
 	 * Creates a Avro deserialization schema for the given record.
 	 *
 	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
 	 */
-	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
+	public AvroRowDeserializationSchema(Class<? extends SpecificRecordBase> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
 		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
@@ -89,6 +97,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
 		this.inputStream = new MutableByteArrayInputStream();
 		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+		this.typeInfo = AvroRecordClassConverter.convert(recordClazz);
 	}
 
 	@Override
@@ -120,6 +129,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 	}
 
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return typeInfo;
+	}
+
 	/**
 	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
 	 * Avro's {@link Utf8} fields are converted into regular Java strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 5341bcf..1d98c14 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,7 +38,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testSerializeDeserializeSimpleRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -50,7 +51,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testSerializeSimpleRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -65,7 +66,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testDeserializeRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -80,7 +81,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testSerializeDeserializeComplexRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -93,7 +94,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testSerializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -108,7 +109,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testDeserializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -123,7 +124,7 @@ public class AvroRowDeSerializationSchemaTest {
 
 	@Test
 	public void testSerializability() throws IOException, ClassNotFoundException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
 
 		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d1b1f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index e6a2021..ce23ccc 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -70,7 +71,7 @@ public final class AvroTestUtils {
 	/**
 	 * Tests a simple Avro data types without nesting.
 	 */
-	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() {
+	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getSimpleTestData() {
 		final Address addr = Address.newBuilder()
 			.setNum(42)
 			.setStreet("Main Street 42")
@@ -86,7 +87,7 @@ public final class AvroTestUtils {
 		rowAddr.setField(3, "Test State");
 		rowAddr.setField(4, "12345");
 
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
 		t.f0 = Address.class;
 		t.f1 = addr;
 		t.f2 = rowAddr;
@@ -97,7 +98,7 @@ public final class AvroTestUtils {
 	/**
 	 * Tests all Avro data types as well as nested types.
 	 */
-	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() {
+	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getComplexTestData() {
 		final Address addr = Address.newBuilder()
 			.setNum(42)
 			.setStreet("Main Street 42")
@@ -148,7 +149,7 @@ public final class AvroTestUtils {
 		rowUser.setField(13, null);
 		rowUser.setField(14, rowAddr);
 
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
 		t.f0 = User.class;
 		t.f1 = user;
 		t.f2 = rowUser;