You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:24 UTC

[flink] branch master updated (2a1da16 -> a6a4d16)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2a1da16  [hotfix] Fix checkstyle violations
     new e2b4264  [hotfix] Fix Pojo comparator field access
     new 09616cb  [hotfix] Extract joda conversions to a separate class
     new b025db6  [hotfix] Fix schema to DataType/Type conversion
     new 6ac8048  [hotfix] Fix time-micros and timestamp-micros handling
     new fb0201d  [hotfix] Migrate AvroTypesITCase to blink planner
     new 115f852  [FLINK-18192] Upgrade avro to 1.10
     new a6a4d16  [FLINK-18802] Create an uber jar for avro for sql-client

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/connectors/formats/avro.md          |   3 +-
 flink-connectors/flink-connector-hive/pom.xml      |  12 +++
 .../api/java/typeutils/runtime/PojoComparator.java |  10 +-
 .../src/main/avro/ComplexPayloadAvro.avsc          |   2 +-
 .../flink-end-to-end-tests-common-kafka/pom.xml    |   6 +-
 .../tests/util/kafka/SQLClientKafkaITCase.java     |   4 -
 .../flink-sql-client-test/pom.xml                  |   6 +-
 .../test-scripts/test_sql_client.sh                |   3 +-
 .../RegistryAvroRowDataSeDeSchemaTest.java         |   2 +-
 flink-formats/flink-avro/pom.xml                   |  31 ------
 .../formats/avro/AvroRowDeserializationSchema.java |  95 ++++++++++++----
 .../formats/avro/AvroRowSerializationSchema.java   |  35 ++++--
 .../formats/avro/AvroToRowDataConverters.java      |  55 ++++++----
 .../apache/flink/formats/avro/JodaConverter.java   |  67 ++++++++++++
 .../formats/avro/RowDataToAvroConverters.java      |   2 +-
 .../avro/typeutils/AvroSchemaConverter.java        |  25 +++--
 .../avro/utils/AvroKryoSerializerUtils.java        |  70 ------------
 .../flink/formats/avro/AvroOutputFormatITCase.java |  13 +--
 .../flink/formats/avro/AvroOutputFormatTest.java   |  14 +--
 .../formats/avro/AvroRecordInputFormatTest.java    |  19 ++--
 .../avro/AvroRowDataDeSerializationSchemaTest.java |  17 +--
 .../avro/AvroSplittableInputFormatTest.java        |  25 ++---
 .../flink/formats/avro/EncoderDecoderTest.java     |  13 +--
 .../RegistryAvroDeserializationSchemaTest.java     |   3 -
 .../avro/typeutils/AvroSchemaConverterTest.java    |  16 +--
 .../avro/typeutils/AvroTypeExtractionTest.java     |  71 +++++++-----
 .../flink/formats/avro/utils/AvroTestUtils.java    |  29 ++---
 .../formats/avro/utils/TestDataGenerator.java      |  14 +--
 .../flink/table/runtime/batch/AvroTypesITCase.java | 119 ++++++++++++---------
 .../flink-avro/src/test/resources/avro/user.avsc   |   4 +-
 .../{flink-sql-orc => flink-sql-avro}/pom.xml      |  47 ++++----
 .../src/main/resources/META-INF/NOTICE             |  12 +--
 flink-formats/pom.xml                              |   1 +
 pom.xml                                            |   5 +-
 34 files changed, 475 insertions(+), 375 deletions(-)
 create mode 100644 flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
 copy flink-formats/{flink-sql-orc => flink-sql-avro}/pom.xml (65%)
 copy flink-formats/{flink-avro-confluent-registry => flink-sql-avro}/src/main/resources/META-INF/NOTICE (69%)


[flink] 01/07: [hotfix] Fix Pojo comparator field access

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2b42645ada3c57d53865fac4531e45019f2632f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:14:40 2020 +0200

    [hotfix] Fix Pojo comparator field access
    
    The PojoComparator assumes it can access a field of a pojo directly. It assumes the field is either public or setAccessible was called before. This is the case though only if  the record went through serialization. This is not the case e.g. in CollectionExecution mode.
    
    Starting from this commit we make all fields accessible in
    PojoComparator constructor.
---
 .../flink/api/java/typeutils/runtime/PojoComparator.java       | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 529fd0d..c19bcc6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.InstantiationUtil;
 
 @Internal
 public final class PojoComparator<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	// Reflection fields for the comp fields
@@ -70,6 +70,10 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 		int nKeyLen = 0;
 		boolean inverted = false;
 
+		for (Field keyField : keyFields) {
+			keyField.setAccessible(true);
+		}
+
 		for (int i = 0; i < this.comparators.length; i++) {
 			TypeComparator<?> k = this.comparators[i];
 			if(k == null) {
@@ -170,7 +174,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 			}
 		}
 	}
-	
+
 	/**
 	 * This method is handling the IllegalAccess exceptions of Field.get()
 	 */
@@ -180,7 +184,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 		} catch (NullPointerException npex) {
 			throw new NullKeyFieldException("Unable to access field "+field+" on object "+object);
 		} catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo."
+			throw new RuntimeException("This should not happen since we call setAccesssible(true) in the ctor."
 			+ " fields: " + field + " obj: " + object);
 		}
 		return object;


[flink] 02/07: [hotfix] Extract joda conversions to a separate class

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09616cba088da907f6d279f959ea8d42e0434e9b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:23:29 2020 +0200

    [hotfix] Extract joda conversions to a separate class
---
 .../formats/avro/AvroRowDeserializationSchema.java | 53 +++++++++++------
 .../formats/avro/AvroToRowDataConverters.java      | 38 ++++++------
 .../apache/flink/formats/avro/JodaConverter.java   | 67 ++++++++++++++++++++++
 3 files changed, 124 insertions(+), 34 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5e3edf7..fb7a74e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -42,10 +42,8 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -124,6 +122,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	private transient Decoder decoder;
 
 	/**
+	 * Converter for joda classes.
+	 */
+	private transient @Nullable JodaConverter jodaConverter;
+
+	/**
 	 * Creates a Avro deserialization schema for the given specific record class. Having the
 	 * concrete Avro record class might improve performance.
 	 *
@@ -139,6 +142,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new SpecificDatumReader<>(schema);
 		inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 
 	/**
@@ -158,6 +162,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new GenericDatumReader<>(schema);
 		inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 
 	@Override
@@ -196,7 +201,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 
 	// --------------------------------------------------------------------------------------------
 
-	private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) {
+	private Row convertAvroRecordToRow(
+			Schema schema,
+			RowTypeInfo typeInfo,
+			IndexedRecord record) {
 		final List<Schema.Field> fields = schema.getFields();
 		final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
 		final int length = fields.size();
@@ -208,7 +216,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return row;
 	}
 
-	private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) {
+	private Object convertAvroType(
+			Schema schema,
+			TypeInformation<?> info,
+			Object object) {
 		// we perform the conversion based on schema information but enriched with pre-computed
 		// type information where useful (i.e., for arrays)
 
@@ -239,7 +250,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 				for (Map.Entry<?, ?> entry : map.entrySet()) {
 					convertedMap.put(
 						entry.getKey().toString(),
-						convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue()));
+						convertAvroType(
+							schema.getValueType(),
+							mapTypeInfo.getValueTypeInfo(),
+							entry.getValue())
+					);
 				}
 				return convertedMap;
 			case UNION:
@@ -302,10 +317,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			// adopted from Apache Calcite
 			final long t = (long) value * 86400000L;
 			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertDate(object);
 		} else {
-			// use 'provided' Joda time
-			final LocalDate value = (LocalDate) object;
-			millis = value.toDate().getTime();
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Date(millis);
 	}
@@ -314,10 +329,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertTime(object);
 		} else {
-			// use 'provided' Joda time
-			final LocalTime value = (LocalTime) object;
-			millis = (long) value.get(DateTimeFieldType.millisOfDay());
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
@@ -326,15 +341,18 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Long) {
 			millis = (Long) object;
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertTimestamp(object);
 		} else {
-			// use 'provided' Joda time
-			final DateTime value = (DateTime) object;
-			millis = value.toDate().getTime();
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
 	}
 
-	private Object[] convertToObjectArray(Schema elementSchema, TypeInformation<?> elementInfo, Object object) {
+	private Object[] convertToObjectArray(
+			Schema elementSchema,
+			TypeInformation<?> elementInfo,
+			Object object) {
 		final List<?> list = (List<?>) object;
 		final Object[] convertedArray = (Object[]) Array.newInstance(
 			elementInfo.getTypeClass(),
@@ -364,5 +382,6 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new SpecificDatumReader<>(schema);
 		this.inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 }
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index 57f7629..b340cba 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -36,9 +36,6 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
 
 import java.io.Serializable;
 import java.lang.reflect.Array;
@@ -49,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
-import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY;
 
 /** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. **/
 @Internal
@@ -178,10 +174,8 @@ public class AvroToRowDataConverters {
 	}
 
 	private static AvroToRowDataConverter createMapConverter(LogicalType type) {
-		final AvroToRowDataConverter keyConverter = createConverter(
-				DataTypes.STRING().getLogicalType());
-		final AvroToRowDataConverter valueConverter = createConverter(
-				extractValueTypeToAvroMap(type));
+		final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType());
+		final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type));
 
 		return avroObject -> {
 			final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -200,9 +194,13 @@ public class AvroToRowDataConverters {
 		if (object instanceof Long) {
 			millis = (Long) object;
 		} else {
-			// use 'provided' Joda time
-			final DateTime value = (DateTime) object;
-			millis = value.toDate().getTime();
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				millis = jodaConverter.convertTimestamp(object);
+			} else {
+				throw new IllegalArgumentException(
+					"Unexpected object type for TIMESTAMP logical type. Received: " + object);
+			}
 		}
 		return toTimestampData(millis);
 	}
@@ -211,9 +209,12 @@ public class AvroToRowDataConverters {
 		if (object instanceof Integer) {
 			return (Integer) object;
 		} else {
-			// use 'provided' Joda time
-			final LocalDate value = (LocalDate) object;
-			return (int) (toTimestampData(value.toDate().getTime()).getMillisecond() / MILLIS_PER_DAY);
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				return (int) jodaConverter.convertDate(object);
+			} else {
+				throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
+			}
 		}
 	}
 
@@ -226,9 +227,12 @@ public class AvroToRowDataConverters {
 		if (object instanceof Integer) {
 			millis = (Integer) object;
 		} else {
-			// use 'provided' Joda time
-			final org.joda.time.LocalTime value = (org.joda.time.LocalTime) object;
-			millis = value.get(DateTimeFieldType.millisOfDay());
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				millis = jodaConverter.convertTime(object);
+			} else {
+				throw new IllegalArgumentException("Unexpected object type for TIME logical type. Received: " + object);
+			}
 		}
 		return millis;
 	}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
new file mode 100644
index 0000000..85cbabc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the classpath.
+ */
+class JodaConverter {
+
+	private static JodaConverter instance;
+	private static boolean instantiated = false;
+
+	public static JodaConverter getConverter() {
+		if (instantiated) {
+			return instance;
+		}
+
+		try {
+			Class.forName("org.joda.time.DateTime", false, Thread.currentThread().getContextClassLoader());
+			instance = new JodaConverter();
+		} catch (ClassNotFoundException e) {
+			instance = null;
+		} finally {
+			instantiated = true;
+		}
+		return instance;
+	}
+
+	public long convertDate(Object object) {
+		final LocalDate value = (LocalDate) object;
+		return value.toDate().getTime();
+	}
+
+	public int convertTime(Object object) {
+		final LocalTime value = (LocalTime) object;
+		return value.get(DateTimeFieldType.millisOfDay());
+	}
+
+	public long convertTimestamp(Object object) {
+		final DateTime value = (DateTime) object;
+		return value.toDate().getTime();
+	}
+
+	private JodaConverter() {
+	}
+}


[flink] 03/07: [hotfix] Fix schema to DataType/Type conversion

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b025db621a56c6f61482c0fbb2fdb91381422904
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 16:30:16 2020 +0200

    [hotfix] Fix schema to DataType/Type conversion
---
 .../avro/typeutils/AvroSchemaConverter.java        | 22 ++++++++++------------
 .../avro/typeutils/AvroSchemaConverterTest.java    | 10 +++++-----
 2 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 44c5d84..332d18e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
 				// logical timestamp type
 				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
 					return Types.SQL_TIMESTAMP;
+				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+					return Types.SQL_TIME;
 				}
 				return Types.LONG;
 			case FLOAT:
@@ -255,28 +257,24 @@ public class AvroSchemaConverter {
 						decimalType.getScale())
 						.notNull();
 			}
-			return DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class))
-					.notNull();
+			return DataTypes.BYTES().notNull();
 		case INT:
 			// logical date and time type
 			final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
 			if (logicalType == LogicalTypes.date()) {
-				return DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull();
+				return DataTypes.DATE().notNull();
 			} else if (logicalType == LogicalTypes.timeMillis()) {
-				return DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull();
+				return DataTypes.TIME().notNull();
 			}
 			return DataTypes.INT().notNull();
 		case LONG:
 			// logical timestamp type
 			if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
-				return DataTypes.TIMESTAMP(3)
-						.bridgedTo(java.sql.Timestamp.class)
-						.notNull();
-			}
-			if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
-				return DataTypes.TIMESTAMP(6)
-						.bridgedTo(java.sql.Timestamp.class)
-						.notNull();
+				return DataTypes.TIMESTAMP(3).notNull();
+			} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+				return DataTypes.TIMESTAMP(6).notNull();
+			} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+				return DataTypes.TIME(6).notNull();
 			}
 			return DataTypes.BIGINT().notNull();
 		case FLOAT:
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 95b5696..98efd6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -190,14 +190,14 @@ public class AvroSchemaConverterTest {
 				DataTypes.FIELD("type_fixed", DataTypes.VARBINARY(16)),
 				DataTypes.FIELD("type_union", DataTypes.RAW(Types.GENERIC(Object.class)).notNull()),
 				DataTypes.FIELD("type_nested", address),
-				DataTypes.FIELD("type_bytes", DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()),
-				DataTypes.FIELD("type_date", DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()),
-				DataTypes.FIELD("type_time_millis", DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()),
+				DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
+				DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
+				DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
 				DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
 				DataTypes.FIELD("type_timestamp_millis",
-						DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).notNull()),
+						DataTypes.TIMESTAMP(3).notNull()),
 				DataTypes.FIELD("type_timestamp_micros",
-						DataTypes.TIMESTAMP(6).bridgedTo(java.sql.Timestamp.class).notNull()),
+						DataTypes.TIMESTAMP(6).notNull()),
 				DataTypes.FIELD("type_decimal_bytes", DataTypes.DECIMAL(4, 2).notNull()),
 				DataTypes.FIELD("type_decimal_fixed", DataTypes.DECIMAL(4, 2).notNull()))
 				.notNull();


[flink] 04/07: [hotfix] Fix time-micros and timestamp-micros handling

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ac804852237aa36a0477757a3cdacd9b69e7dc5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 18:13:53 2020 +0200

    [hotfix] Fix time-micros and timestamp-micros handling
---
 .../formats/avro/AvroRowDeserializationSchema.java | 24 +++++++++++++---
 .../formats/avro/AvroRowSerializationSchema.java   | 32 ++++++++++++++++++----
 .../avro/typeutils/AvroSchemaConverter.java        |  2 ++
 .../flink/formats/avro/AvroOutputFormatITCase.java |  2 +-
 .../flink/formats/avro/AvroOutputFormatTest.java   |  2 +-
 .../formats/avro/AvroRecordInputFormatTest.java    |  2 +-
 .../avro/AvroSplittableInputFormatTest.java        |  4 +--
 .../flink/formats/avro/EncoderDecoderTest.java     |  2 +-
 .../avro/typeutils/AvroSchemaConverterTest.java    |  6 ++--
 .../flink/formats/avro/utils/AvroTestUtils.java    | 16 +++++++----
 .../formats/avro/utils/TestDataGenerator.java      |  2 +-
 .../flink-avro/src/test/resources/avro/user.avsc   |  2 +-
 12 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index fb7a74e..5453f67 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -75,12 +75,13 @@ import java.util.TimeZone;
  */
 @PublicEvolving
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
-
 	/**
 	 * Used for time conversions into SQL types.
 	 */
 	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
 
+	private static final long MICROS_PER_SECOND = 1_000_000L;
+
 	/**
 	 * Avro record class for deserialization. Might be null if record class is not available.
 	 */
@@ -294,7 +295,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 				return object;
 			case LONG:
 				if (info == Types.SQL_TIMESTAMP) {
-					return convertToTimestamp(object);
+					return convertToTimestamp(object, schema.getLogicalType() == LogicalTypes.timestampMicros());
+				} else if (info == Types.SQL_TIME) {
+					return convertToTime(object);
 				}
 				return object;
 			case FLOAT:
@@ -329,6 +332,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (object instanceof Long) {
+			millis = (Long) object / 1000L;
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTime(object);
 		} else {
@@ -337,10 +342,21 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
 
-	private Timestamp convertToTimestamp(Object object) {
+	private Timestamp convertToTimestamp(Object object, boolean isMicros) {
 		final long millis;
 		if (object instanceof Long) {
-			millis = (Long) object;
+			if (isMicros) {
+				long micros = (Long) object;
+				int offsetMillis = LOCAL_TZ.getOffset(micros / 1000L);
+
+				long seconds = micros / MICROS_PER_SECOND - offsetMillis / 1000;
+				int nanos = ((int) (micros % MICROS_PER_SECOND)) * 1000 - offsetMillis % 1000 * 1000;
+				Timestamp timestamp = new Timestamp(seconds * 1000L);
+				timestamp.setNanos(nanos);
+				return timestamp;
+			} else {
+				millis = (Long) object;
+			}
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTimestamp(object);
 		} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 30f9754..7503e8e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -248,9 +248,9 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 				} else if (object instanceof LocalDate) {
 					return convertFromDate(schema, Date.valueOf((LocalDate) object));
 				} else if (object instanceof Time) {
-					return convertFromTime(schema, (Time) object);
+					return convertFromTimeMillis(schema, (Time) object);
 				} else if (object instanceof LocalTime) {
-					return convertFromTime(schema, Time.valueOf((LocalTime) object));
+					return convertFromTimeMillis(schema, Time.valueOf((LocalTime) object));
 				}
 				return object;
 			case LONG:
@@ -259,6 +259,8 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 					return convertFromTimestamp(schema, (Timestamp) object);
 				} else if (object instanceof LocalDateTime) {
 					return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object));
+				} else if (object instanceof Time) {
+					return convertFromTimeMicros(schema, (Time) object);
 				}
 				return object;
 			case FLOAT:
@@ -295,29 +297,49 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 		}
 	}
 
-	private int convertFromTime(Schema schema, Time date) {
+	private int convertFromTimeMillis(Schema schema, Time date) {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.timeMillis()) {
 			// adopted from Apache Calcite
-			final long time = date.getTime();
-			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			final long converted = toEpochMillis(date);
 			return (int) (converted % 86400000L);
 		} else {
 			throw new RuntimeException("Unsupported time type.");
 		}
 	}
 
+	private long convertFromTimeMicros(Schema schema, Time date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timeMicros()) {
+			// adopted from Apache Calcite
+			final long converted = toEpochMillis(date);
+			return (converted % 86400000L) * 1000L;
+		} else {
+			throw new RuntimeException("Unsupported time type.");
+		}
+	}
+
 	private long convertFromTimestamp(Schema schema, Timestamp date) {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.timestampMillis()) {
 			// adopted from Apache Calcite
 			final long time = date.getTime();
 			return time + (long) LOCAL_TZ.getOffset(time);
+		} else if (logicalType == LogicalTypes.timestampMicros()) {
+			long millis = date.getTime();
+			long micros = millis * 1000 + (date.getNanos() % 1_000_000 / 1000);
+			long offset = LOCAL_TZ.getOffset(millis) * 1000L;
+			return micros + offset;
 		} else {
 			throw new RuntimeException("Unsupported timestamp type.");
 		}
 	}
 
+	private long toEpochMillis(java.util.Date date) {
+		final long time = date.getTime();
+		return time + (long) LOCAL_TZ.getOffset(time);
+	}
+
 	private void writeObject(ObjectOutputStream outputStream) throws IOException {
 		outputStream.writeObject(recordClazz);
 		outputStream.writeObject(schemaString); // support for null
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 332d18e..e63a623 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
 				// logical timestamp type
 				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
 					return Types.SQL_TIMESTAMP;
+				} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+					return Types.SQL_TIMESTAMP;
 				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
 					return Types.SQL_TIME;
 				}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index dd901d0..c0ef02c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -158,7 +158,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index f07c36b..47535e9 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -172,7 +172,7 @@ public class AvroOutputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 84849a6..ebb584d 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -139,7 +139,7 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimeMicros(123456L);
 		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user1.setTypeTimestampMicros(123456L);
 		// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index fee81a8..e78b3b2 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -117,7 +117,7 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimeMicros(123456L);
 		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user1.setTypeTimestampMicros(123456L);
 		// 20.00
@@ -183,7 +183,7 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 49ef985..c945d25 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -270,7 +270,7 @@ public class EncoderDecoderTest {
 			ByteBuffer.wrap(b),
 			LocalDate.parse("2014-03-01"),
 			LocalTime.parse("12:12:12"),
-			123456,
+			123456L,
 			DateTime.parse("2014-03-01T12:12:12.321Z"),
 			123456L,
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 98efd6e..c1cada7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -151,9 +151,9 @@ public class AvroSchemaConverterTest {
 			Types.PRIMITIVE_ARRAY(Types.BYTE),
 			Types.SQL_DATE,
 			Types.SQL_TIME,
-			Types.INT,
+			Types.SQL_TIME,
+			Types.SQL_TIMESTAMP,
 			Types.SQL_TIMESTAMP,
-			Types.LONG,
 			Types.BIG_DEC,
 			Types.BIG_DEC);
 
@@ -193,7 +193,7 @@ public class AvroSchemaConverterTest {
 				DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
 				DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
 				DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
-				DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
+				DataTypes.FIELD("type_time_micros", DataTypes.TIME(6).notNull()),
 				DataTypes.FIELD("type_timestamp_millis",
 						DataTypes.TIMESTAMP(3).notNull()),
 				DataTypes.FIELD("type_timestamp_micros",
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 9d77f32..30e51bc 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -121,9 +121,11 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, 123456);
+		rowUser.setField(18, new Time(123)); // we truncate micros
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		rowUser.setField(20, 123456L);
+		Timestamp timestampMicros = new Timestamp(0);
+		timestampMicros.setNanos(123_456_000);
+		rowUser.setField(20, timestampMicros);
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
@@ -156,7 +158,7 @@ public final class AvroTestUtils {
 			"{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
 			"\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
 			"{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
-			"\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+			"\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
 			"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
 			"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
 			"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
@@ -195,7 +197,7 @@ public final class AvroTestUtils {
 		user.put("type_bytes", ByteBuffer.allocate(10));
 		user.put("type_date", LocalDate.parse("2014-03-01"));
 		user.put("type_time_millis", LocalTime.parse("12:12:12"));
-		user.put("type_time_micros", 123456);
+		user.put("type_time_micros", 123456L);
 		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user.put("type_timestamp_micros", 123456L);
 		user.put("type_decimal_bytes",
@@ -224,9 +226,11 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, 123456);
+		rowUser.setField(18, new Time(123)); // we truncate micros
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		rowUser.setField(20, 123456L);
+		Timestamp timestampMicros = new Timestamp(0);
+		timestampMicros.setNanos(123_456_000);
+		rowUser.setField(20, timestampMicros);
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index a4c5bf8..db0452c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -61,7 +61,7 @@ public class TestDataGenerator {
 				generateRandomBytes(rnd),
 				LocalDate.parse("2014-03-01"),
 				LocalTime.parse("12:12:12"),
-				123456,
+				123456L,
 				DateTime.parse("2014-03-01T12:12:12.321Z"),
 				123456L,
 				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index cb9e9b2..4d59e5b 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -34,7 +34,7 @@
      {"name": "type_bytes", "type": "bytes"},
      {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
      {"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}},
-     {"name": "type_time_micros", "type": {"type": "int", "logicalType": "time-micros"}},
+     {"name": "type_time_micros", "type": {"type": "long", "logicalType": "time-micros"}},
      {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
      {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
      {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},


[flink] 06/07: [FLINK-18192] Upgrade avro to 1.10

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 115f85220abb89f3b9d179e0ea2e3b6a3be4c7af
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:28:25 2020 +0200

    [FLINK-18192] Upgrade avro to 1.10
    
    This commit upgrades the default version of avro that flink-avro will use. It should be possible to downgrade the avro version in a user job as the binary format is compatible and we do not expose any dependencies on avro in the API.
    
    Additionally this commit fixes handling of logical types: time-micros and timestamp-micros as well as interpretation of timestamp-millis in the AvroRowDataDeserializationSchema.
---
 flink-connectors/flink-connector-hive/pom.xml      | 12 ++++
 .../src/main/avro/ComplexPayloadAvro.avsc          |  2 +-
 .../RegistryAvroRowDataSeDeSchemaTest.java         |  2 +-
 .../formats/avro/AvroRowDeserializationSchema.java | 18 ++++++
 .../formats/avro/AvroRowSerializationSchema.java   |  3 +-
 .../formats/avro/AvroToRowDataConverters.java      | 17 ++++--
 .../formats/avro/RowDataToAvroConverters.java      |  2 +-
 .../avro/typeutils/AvroSchemaConverter.java        |  5 +-
 .../avro/utils/AvroKryoSerializerUtils.java        | 70 ---------------------
 .../flink/formats/avro/AvroOutputFormatITCase.java | 13 ++--
 .../flink/formats/avro/AvroOutputFormatTest.java   | 14 +++--
 .../formats/avro/AvroRecordInputFormatTest.java    | 19 +++---
 .../avro/AvroRowDataDeSerializationSchemaTest.java | 17 +++---
 .../avro/AvroSplittableInputFormatTest.java        | 25 ++++----
 .../flink/formats/avro/EncoderDecoderTest.java     | 13 ++--
 .../RegistryAvroDeserializationSchemaTest.java     |  3 -
 .../avro/typeutils/AvroTypeExtractionTest.java     | 71 ++++++++++++++--------
 .../flink/formats/avro/utils/AvroTestUtils.java    | 31 +++++-----
 .../formats/avro/utils/TestDataGenerator.java      | 14 ++---
 .../flink/table/runtime/batch/AvroTypesITCase.java | 45 +++++++-------
 .../flink-avro/src/test/resources/avro/user.avsc   |  2 +-
 pom.xml                                            |  5 +-
 22 files changed, 196 insertions(+), 207 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 8b38b97..e0c9b986 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<hiverunner.version>4.0.0</hiverunner.version>
 		<reflections.version>0.9.8</reflections.version>
 		<derby.version>10.10.2.0</derby.version>
+		<hive.avro.version>1.8.2</hive.avro.version>
 	</properties>
 
 	<!-- Overwrite hadoop dependency management from flink-parent to use locally defined Hadoop version -->
@@ -202,6 +203,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!--We downgrade the version of avro for tests. We do that because we test Hive's built-in
+			formats, which do not work with newer avro versions. We do not downgrade the version
+			permanently, because it should be safe to use hive as a catalog with newer avro
+			versions. -->
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>${hive.avro.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-hadoop-fs</artifactId>
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
index 15a801e..445f5af 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
@@ -22,7 +22,7 @@
      {
         "name": "eventTime",
         "type": "long",
-        "default": ""
+        "default": -1
      },
      {
         "name": "stringList",
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
index 1cf348a..dbcec71 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
@@ -148,7 +148,7 @@ public class RegistryAvroRowDataSeDeSchemaTest {
 		byte[] serialized = serializer.serialize(oriData);
 		RowData rowData = deserializer.deserialize(serialized);
 		assertThat(rowData.getArity(), equalTo(schema.getFields().size()));
-		assertEquals(address.getNum(), Integer.valueOf(rowData.getInt(0)));
+		assertEquals(address.getNum(), rowData.getInt(0));
 		assertEquals(address.getStreet(), rowData.getString(1).toString());
 		if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
 			assertEquals(address.getCity(), rowData.getString(2).toString());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5453f67..65d9ede 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -55,6 +55,10 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -320,6 +324,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			// adopted from Apache Calcite
 			final long t = (long) value * 86400000L;
 			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else if (object instanceof LocalDate) {
+			long t = ((LocalDate) object).toEpochDay() * 86400000L;
+			millis = t - (long) LOCAL_TZ.getOffset(t);
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertDate(object);
 		} else {
@@ -334,6 +341,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			millis = (Integer) object;
 		} else if (object instanceof Long) {
 			millis = (Long) object / 1000L;
+		} else if (object instanceof LocalTime) {
+			millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTime(object);
 		} else {
@@ -357,6 +366,15 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			} else {
 				millis = (Long) object;
 			}
+		} else if (object instanceof Instant) {
+			Instant instant = (Instant) object;
+			int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli());
+
+			long seconds = instant.getEpochSecond() - offsetMillis / 1000;
+			int nanos = instant.getNano() - offsetMillis % 1000 * 1000;
+			Timestamp timestamp = new Timestamp(seconds * 1000L);
+			timestamp.setNanos(nanos);
+			return timestamp;
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTimestamp(object);
 		} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 7503e8e..6bc6af1 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -289,8 +289,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.date()) {
 			// adopted from Apache Calcite
-			final long time = date.getTime();
-			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			final long converted = toEpochMillis(date);
 			return (int) (converted / 86400000L);
 		} else {
 			throw new RuntimeException("Unsupported date type.");
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index b340cba..346ccf6 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -40,7 +40,10 @@ import org.apache.avro.generic.IndexedRecord;
 import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
-import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -193,6 +196,8 @@ public class AvroToRowDataConverters {
 		final long millis;
 		if (object instanceof Long) {
 			millis = (Long) object;
+		} else if (object instanceof Instant) {
+			millis = ((Instant) object).toEpochMilli();
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
@@ -202,12 +207,14 @@ public class AvroToRowDataConverters {
 					"Unexpected object type for TIMESTAMP logical type. Received: " + object);
 			}
 		}
-		return toTimestampData(millis);
+		return TimestampData.fromEpochMillis(millis);
 	}
 
 	private static int convertToDate(Object object) {
 		if (object instanceof Integer) {
 			return (Integer) object;
+		} else if (object instanceof LocalDate) {
+			return (int) ((LocalDate) object).toEpochDay();
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
@@ -218,14 +225,12 @@ public class AvroToRowDataConverters {
 		}
 	}
 
-	private static TimestampData toTimestampData(long timeZoneMills) {
-		return TimestampData.fromTimestamp(new Timestamp(timeZoneMills));
-	}
-
 	private static int convertToTime(Object object) {
 		final int millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (object instanceof LocalTime) {
+			millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index 0afb2d26..278956b 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -119,7 +119,7 @@ public class RowDataToAvroConverters {
 			converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
 			break;
 		case TIMESTAMP_WITHOUT_TIME_ZONE:
-			converter = (schema, object) -> ((TimestampData) object).toTimestamp().getTime();
+			converter = (schema, object) -> ((TimestampData) object).toInstant().toEpochMilli();
 			break;
 		case DECIMAL:
 			converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index e63a623..d1438eb 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -155,9 +155,8 @@ public class AvroSchemaConverter {
 				return Types.INT;
 			case LONG:
 				// logical timestamp type
-				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
-					return Types.SQL_TIMESTAMP;
-				} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis() ||
+						schema.getLogicalType() == LogicalTypes.timestampMicros()) {
 					return Types.SQL_TIMESTAMP;
 				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
 					return Types.SQL_TIME;
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index ba125c2..5744abc 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -33,11 +33,6 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-import org.joda.time.Chronology;
-import org.joda.time.DateTimeZone;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-import org.joda.time.chrono.ISOChronology;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -108,69 +103,4 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			return sParser.parse(schemaAsString);
 		}
 	}
-
-	/**
-	 * Avro logical types use JodaTime's LocalDate but Kryo is unable to serialize it
-	 * properly (esp. visible after calling the toString() method).
-	 */
-	public static class JodaLocalDateSerializer extends Serializer<LocalDate> {
-
-		public JodaLocalDateSerializer() {
-			setImmutable(true);
-		}
-
-		@Override
-		public void write(Kryo kryo, Output output, LocalDate localDate) {
-			output.writeInt(localDate.getYear());
-			output.writeInt(localDate.getMonthOfYear());
-			output.writeInt(localDate.getDayOfMonth());
-
-			final Chronology chronology = localDate.getChronology();
-			if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
-				throw new RuntimeException("Unsupported chronology: " + chronology);
-			}
-		}
-
-		@Override
-		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> aClass) {
-			final int y = input.readInt();
-			final int m = input.readInt();
-			final int d = input.readInt();
-
-			return new LocalDate(
-				y,
-				m,
-				d,
-				null);
-		}
-	}
-
-	/**
-	 * Avro logical types use JodaTime's LocalTime but Kryo is unable to serialize it
-	 * properly (esp. visible after calling the toString() method).
-	 */
-	public static class JodaLocalTimeSerializer extends Serializer<LocalTime> {
-
-		@Override
-		public void write(Kryo kryo, Output output, LocalTime object) {
-			final int time = object.getMillisOfDay();
-			output.writeInt(time, true);
-
-			final Chronology chronology = object.getChronology();
-			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
-				throw new RuntimeException("Unsupported chronology: " + chronology);
-			}
-		}
-
-		@Override
-		public LocalTime read(Kryo kryo, Input input, Class<LocalTime> type) {
-			final int time = input.readInt(true);
-			return new LocalTime(time, ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
-		}
-
-		@Override
-		public LocalTime copy(Kryo kryo, LocalTime original) {
-			return new LocalTime(original);
-		}
-	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index c0ef02c..6d97b54 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -32,14 +32,15 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Assert;
 
 import java.io.File;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -158,9 +159,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index 47535e9..2b843fb 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -44,6 +41,10 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -172,9 +173,10 @@ public class AvroOutputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index ebb584d..319ff34 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -48,9 +48,6 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,6 +58,10 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -139,9 +140,9 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456L);
-		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user1.setTypeTimestampMicros(123456L);
+		user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		// 20.00
 		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		// 20.00
@@ -170,9 +171,9 @@ public class AvroRecordInputFormatTest {
 				.setTypeBytes(ByteBuffer.allocate(10))
 				.setTypeDate(LocalDate.parse("2014-03-01"))
 				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-				.setTypeTimeMicros(123456)
-				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-				.setTypeTimestampMicros(123456L)
+				.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+				.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 				// 20.00
 				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 89fcceb..370396f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.generated.JodaTimeRecord;
+import org.apache.flink.formats.avro.generated.LogicalTimeRecord;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -35,15 +35,15 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -164,11 +164,12 @@ public class AvroRowDataDeSerializationSchemaTest {
 
 	@Test
 	public void testSpecificType() throws Exception {
-		JodaTimeRecord record = new JodaTimeRecord();
-		record.setTypeTimestampMillis(DateTime.parse("2010-06-30T01:20:20"));
+		LogicalTimeRecord record = new LogicalTimeRecord();
+		Instant timestamp = Instant.parse("2010-06-30T01:20:20Z");
+		record.setTypeTimestampMillis(timestamp);
 		record.setTypeDate(LocalDate.parse("2014-03-01"));
 		record.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		SpecificDatumWriter<JodaTimeRecord> datumWriter = new SpecificDatumWriter<>(JodaTimeRecord.class);
+		SpecificDatumWriter<LogicalTimeRecord> datumWriter = new SpecificDatumWriter<>(LogicalTimeRecord.class);
 		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
 		Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
 		datumWriter.write(record, encoder);
@@ -191,7 +192,7 @@ public class AvroRowDataDeSerializationSchemaTest {
 		byte[] output = serializationSchema.serialize(rowData);
 		RowData rowData2 = deserializationSchema.deserialize(output);
 		Assert.assertEquals(rowData, rowData2);
-		Assert.assertEquals("2010-06-30T01:20:20", rowData.getTimestamp(0, 3).toString());
+		Assert.assertEquals(timestamp, rowData.getTimestamp(0, 3).toInstant());
 		Assert.assertEquals("2014-03-01", DataFormatConverters.LocalDateConverter.INSTANCE.toExternal(
 				rowData.getInt(1)).toString());
 		Assert.assertEquals("12:12:12", DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index e78b3b2..39b5db6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.flink.formats.avro.generated.User;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +40,10 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
@@ -117,9 +118,9 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456L);
-		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user1.setTypeTimestampMicros(123456L);
+		user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		// 20.00
 		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		// 20.00
@@ -148,9 +149,9 @@ public class AvroSplittableInputFormatTest {
 				.setTypeBytes(ByteBuffer.allocate(10))
 				.setTypeDate(LocalDate.parse("2014-03-01"))
 				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-				.setTypeTimeMicros(123456)
-				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-				.setTypeTimestampMicros(123456L)
+				.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+				.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 				// 20.00
 				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				// 20.00
@@ -183,9 +184,9 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index c945d25..835dc94 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -29,9 +29,6 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -40,6 +37,10 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -270,9 +271,9 @@ public class EncoderDecoderTest {
 			ByteBuffer.wrap(b),
 			LocalDate.parse("2014-03-01"),
 			LocalTime.parse("12:12:12"),
-			123456L,
-			DateTime.parse("2014-03-01T12:12:12.321Z"),
-			123456L,
+			LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+			Instant.parse("2014-03-01T12:12:12.321Z"),
+			Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
 			new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
index 46419e9..2cd7c55 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -73,9 +73,6 @@ public class RegistryAvroDeserializationSchemaTest {
 			Address.getClassSchema()));
 		assertEquals(address.getNum(), genericRecord.get("num"));
 		assertEquals(address.getStreet(), genericRecord.get("street").toString());
-		assertNull(genericRecord.get("city"));
-		assertNull(genericRecord.get("state"));
-		assertNull(genericRecord.get("zip"));
 		assertNull(genericRecord.get("country"));
 	}
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index ccba0a5..7618452 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -89,19 +89,30 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
-			"\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
-			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
-			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
-			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
-			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
-			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
-			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
-			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
-			"\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
-			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
-			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
-			"\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, " +
+			"\"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", " +
+			"\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", " +
+			"\"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, " +
+			"\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+			"\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, " +
+			"\"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+			"\"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n";
 	}
 
 	@Test
@@ -123,17 +134,29 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
-			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
-			"\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
-			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
-			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
-			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
-			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
-			"\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
-			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
-			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
-			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null," +
+			" \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null," +
+			" \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"]," +
+			" \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\"," +
+			" \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null," +
+			" \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\"," +
+			" \"state\": \"London\", \"zip\": \"NW1 6XE\"}," +
+			" \"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, " +
+			"\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+			"\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", " +
+			"\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+			"\"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
 			"\"type_decimal_fixed\": [7, -48]}\n";
 
 	}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 30e51bc..dcc0fb3 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -33,9 +33,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -44,6 +41,10 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -90,9 +91,9 @@ public final class AvroTestUtils {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			// byte array must contain the two's-complement representation of the
 			// unscaled integer value in big-endian byte order
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
@@ -121,11 +122,9 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, new Time(123)); // we truncate micros
+		rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		Timestamp timestampMicros = new Timestamp(0);
-		timestampMicros.setNanos(123_456_000);
-		rowUser.setField(20, timestampMicros);
+		rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
@@ -197,9 +196,9 @@ public final class AvroTestUtils {
 		user.put("type_bytes", ByteBuffer.allocate(10));
 		user.put("type_date", LocalDate.parse("2014-03-01"));
 		user.put("type_time_millis", LocalTime.parse("12:12:12"));
-		user.put("type_time_micros", 123456L);
-		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user.put("type_timestamp_micros", 123456L);
+		user.put("type_time_micros", LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z"));
+		user.put("type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		user.put("type_decimal_bytes",
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		user.put("type_decimal_fixed",
@@ -226,11 +225,9 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, new Time(123)); // we truncate micros
+		rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		Timestamp timestampMicros = new Timestamp(0);
-		timestampMicros.setNanos(123_456_000);
-		rowUser.setField(20, timestampMicros);
+		rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index db0452c..aab530b 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -25,12 +25,12 @@ import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.generated.User;
 
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,9 +61,9 @@ public class TestDataGenerator {
 				generateRandomBytes(rnd),
 				LocalDate.parse("2014-03-01"),
 				LocalTime.parse("12:12:12"),
-				123456L,
-				DateTime.parse("2014-03-01T12:12:12.321Z"),
-				123456L,
+				LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+				Instant.parse("2014-03-01T12:12:12.321Z"),
+				Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
 				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
 				new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 	}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 4d93f6e..c8c71d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
-import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,15 +35,16 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -86,9 +86,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -107,12 +107,13 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeMap(new HashMap<>())
 			.setTypeFixed(new Fixed16())
 			.setTypeUnion(null)
-			.setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeNested(null)
+			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -135,9 +136,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -151,9 +152,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 	@Test
 	public void testAvroToRow() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
-		env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
-		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
 		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
@@ -166,19 +167,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 		String expected =
 			"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
 			"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
-			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," +
-			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,00:00:00.123456," +
+			"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null\n" +
 			"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
 			// TODO we should get an Avro record here instead of a nested row. This should be fixed
 			// with FLIP-136
-			"Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
+			"Berlin,42,Berlin,Bakerstreet,12049,null,null,00:00:00.123456,12:12:12,1970-01-01T00:00:00.123456Z," +
 			"2014-03-01T12:12:12.321Z,null\n" +
 			"yellow,null,Terminator,[false],[world],false," +
 			"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
-			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," +
-			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,00:00:00.123456," +
+			"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null";
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 4d59e5b..aa7f638 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -102,7 +102,7 @@
 },
   {"namespace": "org.apache.flink.formats.avro.generated",
    "type": "record",
-   "name": "JodaTimeRecord",
+   "name": "LogicalTimeRecord",
    "fields": [
        {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
        {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
diff --git a/pom.xml b/pom.xml
index 7afb7a9..1ad4a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@ under the License.
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.10.1</jackson.version>
 		<prometheus.version>0.8.1</prometheus.version>
-		<avro.version>1.8.2</avro.version>
+		<avro.version>1.10.0</avro.version>
 		<javax.activation.api.version>1.2.0</javax.activation.api.version>
 		<jaxb.api.version>2.3.1</jaxb.api.version>
 		<junit.version>4.12</junit.version>
@@ -468,7 +468,8 @@ under the License.
 				<version>3.4.0</version>
 			</dependency>
 
-			<!-- Make sure we use a consistent avro version between Flink and Hadoop -->
+			<!-- We no longer align the avro version with the version bundled in Hadoop.
+			 Users might need to downgrade the avro version for a particular Hadoop version. -->
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro</artifactId>


[flink] 05/07: [hotfix] Migrate AvroTypesITCase to blink planner

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb0201d88f2645a8c0317517f3e1085892dd432f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 19:28:33 2020 +0200

    [hotfix] Migrate AvroTypesITCase to blink planner
---
 .../avro/utils/AvroKryoSerializerUtils.java        |  2 +-
 .../flink/table/runtime/batch/AvroTypesITCase.java | 78 ++++++++++++++--------
 2 files changed, 51 insertions(+), 29 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index b871dbc..ba125c2 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -126,7 +126,7 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			output.writeInt(localDate.getDayOfMonth());
 
 			final Chronology chronology = localDate.getChronology();
-			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+			if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
 				throw new RuntimeException("Unsupported chronology: " + chronology);
 			}
 		}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 7dfe9e3..4d93f6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.table.runtime.batch;
 
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
 import org.joda.time.DateTime;
@@ -48,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.junit.Assert.assertEquals;
@@ -147,15 +150,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroToRow() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
 		env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
 
-		List<Row> results = tEnv.toDataSet(result, Row.class).collect();
+		List<Row> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(
+					result,
+					Row.class)));
 		String expected =
 			"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
 			"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
@@ -163,8 +170,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
 			"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
-			"{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " +
-			"\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," +
+			// TODO we should get an Avro record here instead of a nested row. This should be fixed
+			// with FLIP-136
+			"Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
 			"2014-03-01T12:12:12.321Z,null\n" +
 			"yellow,null,Terminator,[false],[world],false," +
 			"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
@@ -176,12 +184,16 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroStringAccess() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("name"));
-		List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
+		List<Utf8> results = CollectionUtil.iteratorToList(result.execute().collect())
+			.stream()
+			.map(row -> (Utf8) row.getField(0))
+			.collect(Collectors.toList());
+
 		String expected = "Charlie\n" +
 				"Terminator\n" +
 				"Whatever";
@@ -190,38 +202,48 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
 	@Test
 	public void testAvroObjectAccess() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t
 				.filter($("type_nested").isNotNull())
 				.select($("type_nested").flatten())
 				.as("city", "num", "state", "street", "zip");
 
-		List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
+		List<Address> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(
+					result,
+					Address.class)));
 		String expected = USER_1.getTypeNested().toString();
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
 	@Test
 	public void testAvroToAvro() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, config());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
-		Table t = tEnv.fromDataSet(testData(env));
+		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
 
-		List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
+		List<User> results = CollectionUtil.iteratorToList(
+			DataStreamUtils.collect(
+				tEnv.toAppendStream(result, User.class)));
 		List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
 		assertEquals(expected, results);
 	}
 
-	private DataSet<User> testData(ExecutionEnvironment env) {
-		List<User> data = new ArrayList<>(3);
-		data.add(USER_1);
-		data.add(USER_2);
-		data.add(USER_3);
-		return env.fromCollection(data);
+	private DataStream<User> testData(StreamExecutionEnvironment env) {
+		return env.fromElements(
+			USER_1,
+			USER_2,
+			USER_3
+		);
 	}
 }


[flink] 07/07: [FLINK-18802] Create an uber jar for avro for sql-client

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6a4d169f649d193853fddb2c589820fc0004d07
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:29:36 2020 +0200

    [FLINK-18802] Create an uber jar for avro for sql-client
---
 docs/dev/table/connectors/formats/avro.md          |  3 +-
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  6 +-
 .../tests/util/kafka/SQLClientKafkaITCase.java     |  4 -
 .../flink-sql-client-test/pom.xml                  |  6 +-
 .../test-scripts/test_sql_client.sh                |  3 +-
 flink-formats/flink-avro/pom.xml                   | 31 --------
 flink-formats/flink-sql-avro/pom.xml               | 85 ++++++++++++++++++++++
 .../src/main/resources/META-INF/NOTICE             | 13 ++++
 flink-formats/pom.xml                              |  1 +
 9 files changed, 106 insertions(+), 46 deletions(-)

diff --git a/docs/dev/table/connectors/formats/avro.md b/docs/dev/table/connectors/formats/avro.md
index 017c96c..a14b908 100644
--- a/docs/dev/table/connectors/formats/avro.md
+++ b/docs/dev/table/connectors/formats/avro.md
@@ -38,8 +38,7 @@ In order to setup the Avro format, the following table provides dependency infor
 
 <div class="codetabs" markdown="1">
 <div data-lang="SQL Client JAR" markdown="1">
-You can download flink-avro from [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar),
-and requires additional [Hadoop dependency]({% link ops/deployment/hadoop.md %}) for cluster execution.
+You can download flink-sql-avro from [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/{{site.version}}/flink-sql-avro-{{site.version}}.jar)
 </div>
 <div data-lang="Maven dependency" markdown="1">
 {% highlight xml %}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index f629cd5..26ae6fb 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -105,9 +105,8 @@ under the License.
 		<dependency>
 			<!-- Used by maven-dependency-plugin -->
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
+			<artifactId>flink-sql-avro</artifactId>
 			<version>${project.version}</version>
-			<classifier>sql-jar</classifier>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -177,9 +176,8 @@ under the License.
 						</artifactItem>
 						<artifactItem>
 							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-avro</artifactId>
+							<artifactId>flink-sql-avro</artifactId>
 							<version>${project.version}</version>
-							<classifier>sql-jar</classifier>
 							<destFileName>avro.jar</destFileName>
 							<type>jar</type>
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 96f0af5..d86975a 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -127,10 +127,6 @@ public class SQLClientKafkaITCase extends TestLogger {
 		Path tmpPath = tmp.getRoot().toPath();
 		LOG.info("The current temporary path: {}", tmpPath);
 		this.result = tmpPath.resolve("result");
-
-		apacheAvroJars.add(DOWNLOAD_CACHE.getOrDownload("https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar", tmpPath));
-		apacheAvroJars.add(DOWNLOAD_CACHE.getOrDownload("https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar", tmpPath));
-		apacheAvroJars.add(DOWNLOAD_CACHE.getOrDownload("https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar", tmpPath));
 	}
 
 	@Test
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 3514c9e..5f62714 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -47,9 +47,8 @@ under the License.
 		<dependency>
 			<!-- Used by maven-dependency-plugin -->
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
+			<artifactId>flink-sql-avro</artifactId>
 			<version>${project.version}</version>
-			<classifier>sql-jar</classifier>
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
@@ -155,9 +154,8 @@ under the License.
 							<artifactItems>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-avro</artifactId>
+									<artifactId>flink-sql-avro</artifactId>
 									<version>${project.version}</version>
-									<classifier>sql-jar</classifier>
 									<type>jar</type>
 								</artifactItem>
 								<artifactItem>
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index f75c92e..6647a16 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -60,7 +60,8 @@ for SQL_JAR in $SQL_JARS_DIR/*.jar; do
     if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \
         ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \
         ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \
-        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/avro"* ]] ; then
       echo "Bad file in JAR: $EXTRACTED_FILE"
       exit 1
     fi
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index b660b6c..9276fff 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -150,37 +150,6 @@ under the License.
 
 	</dependencies>
 
-	<profiles>
-		<!-- Create SQL Client uber jars by default -->
-		<profile>
-			<id>sql-jars</id>
-			<activation>
-				<property>
-					<name>!skipSqlJars</name>
-				</property>
-			</activation>
-			<build>
-				<plugins>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-jar-plugin</artifactId>
-						<executions>
-							<execution>
-								<phase>package</phase>
-								<goals>
-									<goal>jar</goal>
-								</goals>
-								<configuration>
-									<classifier>sql-jar</classifier>
-								</configuration>
-							</execution>
-						</executions>
-					</plugin>
-				</plugins>
-			</build>
-		</profile>
-	</profiles>
-
 	<build>
 		<plugins>
 			<plugin>
diff --git a/flink-formats/flink-sql-avro/pom.xml b/flink-formats/flink-sql-avro/pom.xml
new file mode 100644
index 0000000..2ed2cbb
--- /dev/null
+++ b/flink-formats/flink-sql-avro/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.12-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-sql-avro</artifactId>
+	<name>Flink : Formats : SQL Avro</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<artifactSet>
+								<includes>
+									<include>org.apache.flink:flink-avro</include>
+									<include>org.apache.avro:avro</include>
+									<include>com.fasterxml.jackson.core:jackson-core</include>
+									<include>com.fasterxml.jackson.core:jackson-databind</include>
+									<include>com.fasterxml.jackson.core:jackson-annotations</include>
+									<include>org.apache.commons:commons-compress</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>com.fasterxml.jackson</pattern>
+									<shadedPattern>org.apache.flink.avro.shaded.com.fasterxml.jackson</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>org.apache.commons.compress</pattern>
+									<shadedPattern>org.apache.flink.avro.shaded.org.apache.commons.compress</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-formats/flink-sql-avro/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-avro/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7290134
--- /dev/null
+++ b/flink-formats/flink-sql-avro/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,13 @@
+flink-sql-avro
+Copyright 2014-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- org.apache.avro:avro:1.10.0
+- com.fasterxml.jackson.core:jackson-core:2.10.1
+- com.fasterxml.jackson.core:jackson-databind:2.10.1
+- com.fasterxml.jackson.core:jackson-annotations:2.10.1
+- com.apache.commons:commons-compress:1.20
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 3d6699b..9caf544 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -81,6 +81,7 @@ under the License.
 			<modules>
 				<module>flink-sql-orc</module>
 				<module>flink-sql-parquet</module>
+				<module>flink-sql-avro</module>
 			</modules>
 		</profile>
 	</profiles>