You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/08 02:14:53 UTC

[flink] branch release-1.11 updated (26d7a12 -> e9a49b7)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 26d7a12  [FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring
     new 2dc5e08  [FLINK-18073][avro] Fix AvroRowDataSerializationSchema is not serializable
     new 85695e9  [hotfix][avro] Fix AvroRowSerializationSchema doesn't support TIMESTAMP type
     new e9a49b7  [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink-connector-kafka-0.10/pom.xml             | 13 +++
 .../flink-connector-kafka-0.11/pom.xml             | 13 +++
 .../connectors/kafka/table/KafkaTableTestBase.java | 81 ++++++++++++++-----
 flink-connectors/flink-connector-kafka/pom.xml     | 13 +++
 .../formats/avro/AvroFileSystemFormatFactory.java  |  3 +-
 .../avro/AvroRowDataSerializationSchema.java       | 92 +++++++++++++++++-----
 .../formats/avro/AvroRowSerializationSchema.java   |  9 +++
 .../avro/typeutils/AvroSchemaConverter.java        | 23 +++---
 .../avro/typeutils/AvroSchemaConverterTest.java    | 42 ++++++++++
 9 files changed, 236 insertions(+), 53 deletions(-)


[flink] 01/03: [FLINK-18073][avro] Fix AvroRowDataSerializationSchema is not serializable

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2dc5e0808556c35b0818a1626e8d57f4dde12f45
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 20:01:47 2020 +0800

    [FLINK-18073][avro] Fix AvroRowDataSerializationSchema is not serializable
    
    This closes #12471
---
 .../formats/avro/AvroFileSystemFormatFactory.java  |  3 +-
 .../avro/AvroRowDataSerializationSchema.java       | 92 +++++++++++++++++-----
 .../avro/typeutils/AvroSchemaConverter.java        | 23 +++---
 .../avro/typeutils/AvroSchemaConverterTest.java    | 42 ++++++++++
 4 files changed, 127 insertions(+), 33 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
index a033739..c60c42c 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java
@@ -243,11 +243,12 @@ public class AvroFileSystemFormatFactory implements FileSystemFormatFactory {
 			BulkWriter<GenericRecord> writer = factory.create(out);
 			AvroRowDataSerializationSchema.SerializationRuntimeConverter converter =
 					AvroRowDataSerializationSchema.createRowConverter(rowType);
+			Schema schema = AvroSchemaConverter.convertToSchema(rowType);
 			return new BulkWriter<RowData>() {
 
 				@Override
 				public void addElement(RowData element) throws IOException {
-					GenericRecord record = (GenericRecord) converter.convert(element);
+					GenericRecord record = (GenericRecord) converter.convert(schema, element);
 					writer.addElement(record);
 				}
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
index 00b7ac5..5b1fbbe 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
@@ -75,6 +75,11 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	private final SerializationRuntimeConverter runtimeConverter;
 
 	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
 	 * Writer to serialize Avro record into a Avro bytes.
 	 */
 	private transient DatumWriter<IndexedRecord> datumWriter;
@@ -99,7 +104,7 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 
 	@Override
 	public void open(InitializationContext context) throws Exception {
-		final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+		this.schema = AvroSchemaConverter.convertToSchema(rowType);
 		datumWriter = new SpecificDatumWriter<>(schema);
 		arrayOutputStream = new ByteArrayOutputStream();
 		encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
@@ -109,7 +114,7 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	public byte[] serialize(RowData row) {
 		try {
 			// convert to record
-			final GenericRecord record = (GenericRecord) runtimeConverter.convert(row);
+			final GenericRecord record = (GenericRecord) runtimeConverter.convert(schema, row);
 			arrayOutputStream.reset();
 			datumWriter.write(record, encoder);
 			encoder.flush();
@@ -145,33 +150,43 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 	 * to corresponding Avro data structures.
 	 */
 	interface SerializationRuntimeConverter extends Serializable {
-		Object convert(Object object);
+		Object convert(Schema schema, Object object);
 	}
 
 	static SerializationRuntimeConverter createRowConverter(RowType rowType) {
 		final SerializationRuntimeConverter[] fieldConverters = rowType.getChildren().stream()
 			.map(AvroRowDataSerializationSchema::createConverter)
 			.toArray(SerializationRuntimeConverter[]::new);
-		final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
 		final LogicalType[] fieldTypes = rowType.getFields().stream()
 			.map(RowType.RowField::getType)
 			.toArray(LogicalType[]::new);
+		final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+		}
 		final int length = rowType.getFieldCount();
 
-		return object -> {
+		return (schema, object) -> {
 			final RowData row = (RowData) object;
+			final List<Schema.Field> fields = schema.getFields();
 			final GenericRecord record = new GenericData.Record(schema);
 			for (int i = 0; i < length; ++i) {
-				record.put(i, fieldConverters[i].convert(RowData.get(row, i, fieldTypes[i])));
+				final Schema.Field schemaField = fields.get(i);
+				Object avroObject = fieldConverters[i].convert(
+					schemaField.schema(),
+					fieldGetters[i].getFieldOrNull(row));
+				record.put(i, avroObject);
 			}
 			return record;
 		};
 	}
 
 	private static SerializationRuntimeConverter createConverter(LogicalType type) {
+		final SerializationRuntimeConverter converter;
 		switch (type.getTypeRoot()) {
 			case NULL:
-				return object -> null;
+				converter = (schema, object) -> null;
+				break;
 			case BOOLEAN: // boolean
 			case INTEGER: // int
 			case INTERVAL_YEAR_MONTH: // long
@@ -181,39 +196,74 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 			case DOUBLE: // double
 			case TIME_WITHOUT_TIME_ZONE: // int
 			case DATE: // int
-				return avroObject -> avroObject;
+				converter = (schema, object) -> object;
+				break;
 			case CHAR:
 			case VARCHAR:
-				return object -> new Utf8(object.toString());
+				converter = (schema, object) -> new Utf8(object.toString());
+				break;
 			case BINARY:
 			case VARBINARY:
-				return object -> ByteBuffer.wrap((byte[]) object);
+				converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
+				break;
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
-				return object -> ((TimestampData) object).toTimestamp().getTime();
+				converter = (schema, object) -> ((TimestampData) object).toTimestamp().getTime();
+				break;
 			case DECIMAL:
-				return object -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+				converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+				break;
 			case ARRAY:
-				return createArrayConverter((ArrayType) type);
+				converter = createArrayConverter((ArrayType) type);
+				break;
 			case ROW:
-				return createRowConverter((RowType) type);
+				converter = createRowConverter((RowType) type);
+				break;
 			case MAP:
 			case MULTISET:
-				return createMapConverter(type);
+				converter = createMapConverter(type);
+				break;
 			case RAW:
 			default:
 				throw new UnsupportedOperationException("Unsupported type: " + type);
 		}
+
+		// wrap into nullable converter
+		return (schema, object) -> {
+			if (object == null) {
+				return null;
+			}
+
+			// get actual schema if it is a nullable schema
+			Schema actualSchema;
+			if (schema.getType() == Schema.Type.UNION) {
+				List<Schema> types = schema.getTypes();
+				int size = types.size();
+				if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(0);
+				} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(1);
+				} else {
+					throw new IllegalArgumentException(
+						"The Avro schema is not a nullable type: " + schema.toString());
+				}
+			} else {
+				actualSchema = schema;
+			}
+			return converter.convert(actualSchema, object);
+		};
 	}
 
 	private static SerializationRuntimeConverter createArrayConverter(ArrayType arrayType) {
+		LogicalType elementType = arrayType.getElementType();
+		final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
 		final SerializationRuntimeConverter elementConverter = createConverter(arrayType.getElementType());
-		final LogicalType elementType = arrayType.getElementType();
 
-		return object -> {
+		return (schema, object) -> {
+			final Schema elementSchema = schema.getElementType();
 			ArrayData arrayData = (ArrayData) object;
 			List<Object> list = new ArrayList<>();
 			for (int i = 0; i < arrayData.size(); ++i) {
-				list.add(elementConverter.convert(ArrayData.get(arrayData, i, elementType)));
+				list.add(elementConverter.convert(elementSchema, elementGetter.getElementOrNull(arrayData, i)));
 			}
 			return list;
 		};
@@ -221,16 +271,18 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 
 	private static SerializationRuntimeConverter createMapConverter(LogicalType type) {
 		LogicalType valueType = extractValueTypeToAvroMap(type);
+		final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
 		final SerializationRuntimeConverter valueConverter = createConverter(valueType);
 
-		return object -> {
+		return (schema, object) -> {
+			final Schema valueSchema = schema.getValueType();
 			final MapData mapData = (MapData) object;
 			final ArrayData keyArray = mapData.keyArray();
 			final ArrayData valueArray = mapData.valueArray();
 			final Map<Object, Object> map = new HashMap<>(mapData.size());
 			for (int i = 0; i < mapData.size(); ++i) {
 				final String key = keyArray.getString(i).toString();
-				final Object value = valueConverter.convert(ArrayData.get(valueArray, i, valueType));
+				final Object value = valueConverter.convert(valueSchema, valueGetter.getElementOrNull(valueArray, i));
 				map.put(key, value);
 			}
 			return map;
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 774fadf..37745e5 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.Row;
@@ -179,6 +180,7 @@ public class AvroSchemaConverter {
 	}
 
 	public static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter) {
+		int precision;
 		switch (logicalType.getTypeRoot()) {
 			case NULL:
 				return SchemaBuilder.builder().nullType();
@@ -201,20 +203,25 @@ public class AvroSchemaConverter {
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 				// use long to represents Timestamp
 				final TimestampType timestampType = (TimestampType) logicalType;
-				int precision = timestampType.getPrecision();
+				precision = timestampType.getPrecision();
 				org.apache.avro.LogicalType avroLogicalType;
 				if (precision <= 3) {
 					avroLogicalType = LogicalTypes.timestampMillis();
 				} else {
-					throw new IllegalArgumentException("Avro Timestamp does not support Timestamp with precision: " +
-						precision +
-						", it only supports precision of 3 or 9.");
+					throw new IllegalArgumentException("Avro does not support TIMESTAMP type " +
+						"with precision: " + precision + ", it only supports precision less than 3.");
 				}
 				return avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
 			case DATE:
 				// use int to represents Date
 				return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
 			case TIME_WITHOUT_TIME_ZONE:
+				precision = ((TimeType) logicalType).getPrecision();
+				if (precision > 3) {
+					throw new IllegalArgumentException(
+						"Avro does not support TIME type with precision: " + precision +
+						", it only supports precision less than 3.");
+				}
 				// use int to represents Time, we only support millisecond when deserialization
 				return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
 			case DECIMAL:
@@ -254,14 +261,6 @@ public class AvroSchemaConverter {
 					.array()
 					.items(convertToSchema(arrayType.getElementType(), rowTypeCounter));
 			case RAW:
-				// if the union type has more than 2 types, it will be recognized a generic type
-				// see AvroRowDeserializationSchema#convertAvroType and AvroRowSerializationSchema#convertFlinkType
-				return SchemaBuilder.builder().unionOf()
-					.nullType().and()
-					.booleanType().and()
-					.longType().and()
-					.doubleType()
-					.endUnion();
 			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 			default:
 				throw new UnsupportedOperationException("Unsupported to derive Schema for type: " + logicalType);
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index be0ddc4..fa499b7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -22,9 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -34,6 +39,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class AvroSchemaConverterTest {
 
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
 	@Test
 	public void testAvroClassConversion() {
 		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
@@ -45,6 +53,40 @@ public class AvroSchemaConverterTest {
 		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
 	}
 
+	@Test
+	public void testInvalidRawTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.RAW(Types.GENERIC(AvroSchemaConverterTest.class)))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(UnsupportedOperationException.class);
+		thrown.expectMessage("Unsupported to derive Schema for type: RAW");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
+	@Test
+	public void testInvalidTimestampTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.TIMESTAMP(9))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Avro does not support TIMESTAMP type with precision: 9, " +
+			"it only supports precision less than 3.");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
+	@Test
+	public void testInvalidTimeTypeAvroSchemaConversion() {
+		RowType rowType = (RowType) TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.TIME(6))
+			.build().toRowDataType().getLogicalType();
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Avro does not support TIME type with precision: 6, it only supports precision less than 3.");
+		AvroSchemaConverter.convertToSchema(rowType);
+	}
+
 	private void validateUserSchema(TypeInformation<?> actual) {
 		final TypeInformation<Row> address = Types.ROW_NAMED(
 			new String[]{


[flink] 03/03: [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json)

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9a49b7da4d7edcad8ca9c991da3fe1ab3547b9e
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 20:02:21 2020 +0800

    [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json)
    
    This closes #12471
---
 .../flink-connector-kafka-0.10/pom.xml             | 13 ++++
 .../flink-connector-kafka-0.11/pom.xml             | 13 ++++
 .../connectors/kafka/table/KafkaTableTestBase.java | 81 ++++++++++++++++------
 flink-connectors/flink-connector-kafka/pom.xml     | 13 ++++
 4 files changed, 100 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 71c17ff..7e5d375 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -204,12 +204,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<!-- Required for org.apache.flink.streaming.connectors.kafka.Kafka010SecuredRunITCase -->
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 534d2d8..b33a86f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -204,12 +204,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 	</dependencies>
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index c1dc7ce..49d0269 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -46,17 +46,26 @@ import static org.junit.Assert.assertEquals;
 @RunWith(Parameterized.class)
 public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 
+	private static final String JSON_FORMAT = "json";
+	private static final String AVRO_FORMAT = "avro";
+	private static final String CSV_FORMAT = "csv";
+
 	@Parameterized.Parameter
 	public boolean isLegacyConnector;
 
 	@Parameterized.Parameter(1)
-	public int topicID;
+	public String format;
 
-	@Parameterized.Parameters(name = "legacy = {0}, topicId = {1}")
+	@Parameterized.Parameters(name = "legacy = {0}, format = {1}")
 	public static Object[] parameters() {
 		return new Object[][]{
-			new Object[]{true, 0},
-			new Object[]{false, 1}
+			// cover all 3 formats for new and old connector
+			new Object[]{false, JSON_FORMAT},
+			new Object[]{false, AVRO_FORMAT},
+			new Object[]{false, CSV_FORMAT},
+			new Object[]{true, JSON_FORMAT},
+			new Object[]{true, AVRO_FORMAT},
+			new Object[]{true, CSV_FORMAT}
 		};
 	}
 
@@ -87,7 +96,9 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 
 	@Test
 	public void testKafkaSourceSink() throws Exception {
-		final String topic = "tstopic" + topicID;
+		// we always use a different topic name for each parameterized topic,
+		// in order to make sure the topic can be created.
+		final String topic = "tstopic_" + format + "_" + isLegacyConnector;
 		createTestTopic(topic, 1, 1);
 
 		// ---------- Produce an event time stream into Kafka -------------------
@@ -101,6 +112,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 					"  `computed-price` as price + 1.0,\n" +
 					"  price decimal(38, 18),\n" +
 					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
 					"  log_ts timestamp(3),\n" +
 					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
 					"  watermark for ts as ts\n" +
@@ -110,18 +123,21 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 					"  'properties.bootstrap.servers' = '%s',\n" +
 					"  'properties.group.id' = '%s',\n" +
 					"  'scan.startup.mode' = 'earliest-offset',\n" +
-					"  'format' = 'json'\n" +
+					"  %s\n" +
 					")",
 				factoryIdentifier(),
 				topic,
 				bootstraps,
-				groupId);
+				groupId,
+				formatOptions());
 		} else {
 			createTable = String.format(
 				"create table kafka (\n" +
 					"  `computed-price` as price + 1.0,\n" +
 					"  price decimal(38, 18),\n" +
 					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
 					"  log_ts timestamp(3),\n" +
 					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
 					"  watermark for ts as ts\n" +
@@ -132,32 +148,36 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 					"  'connector.properties.bootstrap.servers' = '%s',\n" +
 					"  'connector.properties.group.id' = '%s',\n" +
 					"  'connector.startup-mode' = 'earliest-offset',\n" +
-					"  'format.type' = 'json',\n" +
-					"  'update-mode' = 'append'\n" +
+					"  'update-mode' = 'append',\n" +
+					"  %s\n" +
 					")",
 				kafkaVersion(),
 				topic,
 				bootstraps,
-				groupId);
+				groupId,
+				formatOptions());
 		}
 
 		tEnv.executeSql(createTable);
 
 		String initialValues = "INSERT INTO kafka\n" +
-			"SELECT CAST(price AS DECIMAL(10, 2)), currency, CAST(ts AS TIMESTAMP(3))\n" +
-			"FROM (VALUES (2.02,'Euro','2019-12-12 00:00:00.001001'), \n" +
-			"  (1.11,'US Dollar','2019-12-12 00:00:01.002001'), \n" +
-			"  (50,'Yen','2019-12-12 00:00:03.004001'), \n" +
-			"  (3.1,'Euro','2019-12-12 00:00:04.005001'), \n" +
-			"  (5.33,'US Dollar','2019-12-12 00:00:05.006001'), \n" +
-			"  (0,'DUMMY','2019-12-12 00:00:10'))\n" +
-			"  AS orders (price, currency, ts)";
+			"SELECT CAST(price AS DECIMAL(10, 2)), currency, " +
+			" CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\n" +
+			"FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-12-12 00:00:01.001001'), \n" +
+			"  (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 00:00:02.002001'), \n" +
+			"  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 00:00:03.004001'), \n" +
+			"  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 00:00:04.005001'), \n" +
+			"  (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 00:00:05.006001'), \n" +
+			"  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 00:00:10'))\n" +
+			"  AS orders (price, currency, d, t, ts)";
 		TableEnvUtil.execInsertSqlAndWaitResult(tEnv, initialValues);
 
 		// ---------- Consume stream from Kafka -------------------
 
 		String query = "SELECT\n" +
 			"  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n" +
+			"  CAST(MAX(log_date) AS VARCHAR),\n" +
+			"  CAST(MAX(log_time) AS VARCHAR),\n" +
 			"  CAST(MAX(ts) AS VARCHAR),\n" +
 			"  COUNT(*),\n" +
 			"  CAST(MAX(price) AS DECIMAL(10, 2))\n" +
@@ -180,8 +200,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 		}
 
 		List<String> expected = Arrays.asList(
-			"+I(2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00)",
-			"+I(2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33)");
+			"+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)",
+			"+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)");
 
 		assertEquals(expected, TestingSinkFunction.rows);
 
@@ -190,6 +210,27 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 		deleteTestTopic(topic);
 	}
 
+	private String formatOptions() {
+		if (!isLegacyConnector) {
+			return String.format("'format' = '%s'", format);
+		} else {
+			String formatType = String.format("'format.type' = '%s'", format);
+			if (format.equals(AVRO_FORMAT)) {
+				// legacy connector requires to specify avro-schema
+				String avroSchema = "{\"type\":\"record\",\"name\":\"row_0\",\"fields\":" +
+					"[{\"name\":\"price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\"," +
+					"\"precision\":38,\"scale\":18}},{\"name\":\"currency\",\"type\":[\"string\"," +
+					"\"null\"]},{\"name\":\"log_date\",\"type\":{\"type\":\"int\",\"logicalType\":" +
+					"\"date\"}},{\"name\":\"log_time\",\"type\":{\"type\":\"int\",\"logicalType\":" +
+					"\"time-millis\"}},{\"name\":\"log_ts\",\"type\":{\"type\":\"long\"," +
+					"\"logicalType\":\"timestamp-millis\"}}]}";
+				return formatType + String.format(", 'format.avro-schema' = '%s'", avroSchema);
+			} else {
+				return formatType;
+			}
+		}
+	}
+
 	private static final class TestingSinkFunction implements SinkFunction<RowData> {
 
 		private static final long serialVersionUID = 455430015321124493L;
diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml
index 1d97d2d..39f5444 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -196,12 +196,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 	</dependencies>
 


[flink] 02/03: [hotfix][avro] Fix AvroRowSerializationSchema doesn't support TIMESTAMP type

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85695e9cc9d8a4642d2998e730f99b11c05a4b2b
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 20:02:13 2020 +0800

    [hotfix][avro] Fix AvroRowSerializationSchema doesn't support TIMESTAMP type
---
 .../apache/flink/formats/avro/AvroRowSerializationSchema.java    | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index d4c73197..30f9754 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -48,6 +48,9 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -242,14 +245,20 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 				// check for logical types
 				if (object instanceof Date) {
 					return convertFromDate(schema, (Date) object);
+				} else if (object instanceof LocalDate) {
+					return convertFromDate(schema, Date.valueOf((LocalDate) object));
 				} else if (object instanceof Time) {
 					return convertFromTime(schema, (Time) object);
+				} else if (object instanceof LocalTime) {
+					return convertFromTime(schema, Time.valueOf((LocalTime) object));
 				}
 				return object;
 			case LONG:
 				// check for logical type
 				if (object instanceof Timestamp) {
 					return convertFromTimestamp(schema, (Timestamp) object);
+				} else if (object instanceof LocalDateTime) {
+					return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object));
 				}
 				return object;
 			case FLOAT: