You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:30 UTC

[flink] 06/07: [FLINK-18192] Upgrade avro to 1.10

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 115f85220abb89f3b9d179e0ea2e3b6a3be4c7af
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:28:25 2020 +0200

    [FLINK-18192] Upgrade avro to 1.10
    
    This commit upgrades the default version of avro that flink-avro will use. It should be possible to downgrade the avro version in a user job as the binary format is compatible and we do not expose any dependencies on avro in the API.
    
    Additionally this commit fixes handling of logical types: time-micros and timestamp-micros as well as interpretation of timestamp-millis in the AvroRowDataDeserializationSchema.
---
 flink-connectors/flink-connector-hive/pom.xml      | 12 ++++
 .../src/main/avro/ComplexPayloadAvro.avsc          |  2 +-
 .../RegistryAvroRowDataSeDeSchemaTest.java         |  2 +-
 .../formats/avro/AvroRowDeserializationSchema.java | 18 ++++++
 .../formats/avro/AvroRowSerializationSchema.java   |  3 +-
 .../formats/avro/AvroToRowDataConverters.java      | 17 ++++--
 .../formats/avro/RowDataToAvroConverters.java      |  2 +-
 .../avro/typeutils/AvroSchemaConverter.java        |  5 +-
 .../avro/utils/AvroKryoSerializerUtils.java        | 70 ---------------------
 .../flink/formats/avro/AvroOutputFormatITCase.java | 13 ++--
 .../flink/formats/avro/AvroOutputFormatTest.java   | 14 +++--
 .../formats/avro/AvroRecordInputFormatTest.java    | 19 +++---
 .../avro/AvroRowDataDeSerializationSchemaTest.java | 17 +++---
 .../avro/AvroSplittableInputFormatTest.java        | 25 ++++----
 .../flink/formats/avro/EncoderDecoderTest.java     | 13 ++--
 .../RegistryAvroDeserializationSchemaTest.java     |  3 -
 .../avro/typeutils/AvroTypeExtractionTest.java     | 71 ++++++++++++++--------
 .../flink/formats/avro/utils/AvroTestUtils.java    | 31 +++++-----
 .../formats/avro/utils/TestDataGenerator.java      | 14 ++---
 .../flink/table/runtime/batch/AvroTypesITCase.java | 45 +++++++-------
 .../flink-avro/src/test/resources/avro/user.avsc   |  2 +-
 pom.xml                                            |  5 +-
 22 files changed, 196 insertions(+), 207 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 8b38b97..e0c9b986 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<hiverunner.version>4.0.0</hiverunner.version>
 		<reflections.version>0.9.8</reflections.version>
 		<derby.version>10.10.2.0</derby.version>
+		<hive.avro.version>1.8.2</hive.avro.version>
 	</properties>
 
 	<!-- Overwrite hadoop dependency management from flink-parent to use locally defined Hadoop version -->
@@ -202,6 +203,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!--We downgrade the version of avro for tests. We do that because we test Hive's built-in
+			formats, which do not work with newer avro versions. We do not downgrade the version
+			permanently, because it should be safe to use hive as a catalog with newer avro
+			versions. -->
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>${hive.avro.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-hadoop-fs</artifactId>
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
index 15a801e..445f5af 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
@@ -22,7 +22,7 @@
      {
         "name": "eventTime",
         "type": "long",
-        "default": ""
+        "default": -1
      },
      {
         "name": "stringList",
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
index 1cf348a..dbcec71 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
@@ -148,7 +148,7 @@ public class RegistryAvroRowDataSeDeSchemaTest {
 		byte[] serialized = serializer.serialize(oriData);
 		RowData rowData = deserializer.deserialize(serialized);
 		assertThat(rowData.getArity(), equalTo(schema.getFields().size()));
-		assertEquals(address.getNum(), Integer.valueOf(rowData.getInt(0)));
+		assertEquals(address.getNum(), rowData.getInt(0));
 		assertEquals(address.getStreet(), rowData.getString(1).toString());
 		if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
 			assertEquals(address.getCity(), rowData.getString(2).toString());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5453f67..65d9ede 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -55,6 +55,10 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -320,6 +324,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			// adopted from Apache Calcite
 			final long t = (long) value * 86400000L;
 			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else if (object instanceof LocalDate) {
+			long t = ((LocalDate) object).toEpochDay() * 86400000L;
+			millis = t - (long) LOCAL_TZ.getOffset(t);
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertDate(object);
 		} else {
@@ -334,6 +341,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			millis = (Integer) object;
 		} else if (object instanceof Long) {
 			millis = (Long) object / 1000L;
+		} else if (object instanceof LocalTime) {
+			millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTime(object);
 		} else {
@@ -357,6 +366,15 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			} else {
 				millis = (Long) object;
 			}
+		} else if (object instanceof Instant) {
+			Instant instant = (Instant) object;
+			int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli());
+
+			long seconds = instant.getEpochSecond() - offsetMillis / 1000;
+			int nanos = instant.getNano() - offsetMillis % 1000 * 1000;
+			Timestamp timestamp = new Timestamp(seconds * 1000L);
+			timestamp.setNanos(nanos);
+			return timestamp;
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTimestamp(object);
 		} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 7503e8e..6bc6af1 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -289,8 +289,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.date()) {
 			// adopted from Apache Calcite
-			final long time = date.getTime();
-			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			final long converted = toEpochMillis(date);
 			return (int) (converted / 86400000L);
 		} else {
 			throw new RuntimeException("Unsupported date type.");
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index b340cba..346ccf6 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -40,7 +40,10 @@ import org.apache.avro.generic.IndexedRecord;
 import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
-import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -193,6 +196,8 @@ public class AvroToRowDataConverters {
 		final long millis;
 		if (object instanceof Long) {
 			millis = (Long) object;
+		} else if (object instanceof Instant) {
+			millis = ((Instant) object).toEpochMilli();
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
@@ -202,12 +207,14 @@ public class AvroToRowDataConverters {
 					"Unexpected object type for TIMESTAMP logical type. Received: " + object);
 			}
 		}
-		return toTimestampData(millis);
+		return TimestampData.fromEpochMillis(millis);
 	}
 
 	private static int convertToDate(Object object) {
 		if (object instanceof Integer) {
 			return (Integer) object;
+		} else if (object instanceof LocalDate) {
+			return (int) ((LocalDate) object).toEpochDay();
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
@@ -218,14 +225,12 @@ public class AvroToRowDataConverters {
 		}
 	}
 
-	private static TimestampData toTimestampData(long timeZoneMills) {
-		return TimestampData.fromTimestamp(new Timestamp(timeZoneMills));
-	}
-
 	private static int convertToTime(Object object) {
 		final int millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (object instanceof LocalTime) {
+			millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
 		} else {
 			JodaConverter jodaConverter = JodaConverter.getConverter();
 			if (jodaConverter != null) {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index 0afb2d26..278956b 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -119,7 +119,7 @@ public class RowDataToAvroConverters {
 			converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
 			break;
 		case TIMESTAMP_WITHOUT_TIME_ZONE:
-			converter = (schema, object) -> ((TimestampData) object).toTimestamp().getTime();
+			converter = (schema, object) -> ((TimestampData) object).toInstant().toEpochMilli();
 			break;
 		case DECIMAL:
 			converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index e63a623..d1438eb 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -155,9 +155,8 @@ public class AvroSchemaConverter {
 				return Types.INT;
 			case LONG:
 				// logical timestamp type
-				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
-					return Types.SQL_TIMESTAMP;
-				} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis() ||
+						schema.getLogicalType() == LogicalTypes.timestampMicros()) {
 					return Types.SQL_TIMESTAMP;
 				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
 					return Types.SQL_TIME;
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index ba125c2..5744abc 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -33,11 +33,6 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-import org.joda.time.Chronology;
-import org.joda.time.DateTimeZone;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-import org.joda.time.chrono.ISOChronology;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -108,69 +103,4 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			return sParser.parse(schemaAsString);
 		}
 	}
-
-	/**
-	 * Avro logical types use JodaTime's LocalDate but Kryo is unable to serialize it
-	 * properly (esp. visible after calling the toString() method).
-	 */
-	public static class JodaLocalDateSerializer extends Serializer<LocalDate> {
-
-		public JodaLocalDateSerializer() {
-			setImmutable(true);
-		}
-
-		@Override
-		public void write(Kryo kryo, Output output, LocalDate localDate) {
-			output.writeInt(localDate.getYear());
-			output.writeInt(localDate.getMonthOfYear());
-			output.writeInt(localDate.getDayOfMonth());
-
-			final Chronology chronology = localDate.getChronology();
-			if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
-				throw new RuntimeException("Unsupported chronology: " + chronology);
-			}
-		}
-
-		@Override
-		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> aClass) {
-			final int y = input.readInt();
-			final int m = input.readInt();
-			final int d = input.readInt();
-
-			return new LocalDate(
-				y,
-				m,
-				d,
-				null);
-		}
-	}
-
-	/**
-	 * Avro logical types use JodaTime's LocalTime but Kryo is unable to serialize it
-	 * properly (esp. visible after calling the toString() method).
-	 */
-	public static class JodaLocalTimeSerializer extends Serializer<LocalTime> {
-
-		@Override
-		public void write(Kryo kryo, Output output, LocalTime object) {
-			final int time = object.getMillisOfDay();
-			output.writeInt(time, true);
-
-			final Chronology chronology = object.getChronology();
-			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
-				throw new RuntimeException("Unsupported chronology: " + chronology);
-			}
-		}
-
-		@Override
-		public LocalTime read(Kryo kryo, Input input, Class<LocalTime> type) {
-			final int time = input.readInt(true);
-			return new LocalTime(time, ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
-		}
-
-		@Override
-		public LocalTime copy(Kryo kryo, LocalTime original) {
-			return new LocalTime(original);
-		}
-	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index c0ef02c..6d97b54 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -32,14 +32,15 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Assert;
 
 import java.io.File;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -158,9 +159,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index 47535e9..2b843fb 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -44,6 +41,10 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -172,9 +173,10 @@ public class AvroOutputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index ebb584d..319ff34 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -48,9 +48,6 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,6 +58,10 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -139,9 +140,9 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456L);
-		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user1.setTypeTimestampMicros(123456L);
+		user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		// 20.00
 		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		// 20.00
@@ -170,9 +171,9 @@ public class AvroRecordInputFormatTest {
 				.setTypeBytes(ByteBuffer.allocate(10))
 				.setTypeDate(LocalDate.parse("2014-03-01"))
 				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-				.setTypeTimeMicros(123456)
-				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-				.setTypeTimestampMicros(123456L)
+				.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+				.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 				// 20.00
 				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 89fcceb..370396f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.generated.JodaTimeRecord;
+import org.apache.flink.formats.avro.generated.LogicalTimeRecord;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -35,15 +35,15 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -164,11 +164,12 @@ public class AvroRowDataDeSerializationSchemaTest {
 
 	@Test
 	public void testSpecificType() throws Exception {
-		JodaTimeRecord record = new JodaTimeRecord();
-		record.setTypeTimestampMillis(DateTime.parse("2010-06-30T01:20:20"));
+		LogicalTimeRecord record = new LogicalTimeRecord();
+		Instant timestamp = Instant.parse("2010-06-30T01:20:20Z");
+		record.setTypeTimestampMillis(timestamp);
 		record.setTypeDate(LocalDate.parse("2014-03-01"));
 		record.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		SpecificDatumWriter<JodaTimeRecord> datumWriter = new SpecificDatumWriter<>(JodaTimeRecord.class);
+		SpecificDatumWriter<LogicalTimeRecord> datumWriter = new SpecificDatumWriter<>(LogicalTimeRecord.class);
 		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
 		Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
 		datumWriter.write(record, encoder);
@@ -191,7 +192,7 @@ public class AvroRowDataDeSerializationSchemaTest {
 		byte[] output = serializationSchema.serialize(rowData);
 		RowData rowData2 = deserializationSchema.deserialize(output);
 		Assert.assertEquals(rowData, rowData2);
-		Assert.assertEquals("2010-06-30T01:20:20", rowData.getTimestamp(0, 3).toString());
+		Assert.assertEquals(timestamp, rowData.getTimestamp(0, 3).toInstant());
 		Assert.assertEquals("2014-03-01", DataFormatConverters.LocalDateConverter.INSTANCE.toExternal(
 				rowData.getInt(1)).toString());
 		Assert.assertEquals("12:12:12", DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index e78b3b2..39b5db6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.flink.formats.avro.generated.User;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +40,10 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
@@ -117,9 +118,9 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456L);
-		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user1.setTypeTimestampMicros(123456L);
+		user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		// 20.00
 		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		// 20.00
@@ -148,9 +149,9 @@ public class AvroSplittableInputFormatTest {
 				.setTypeBytes(ByteBuffer.allocate(10))
 				.setTypeDate(LocalDate.parse("2014-03-01"))
 				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-				.setTypeTimeMicros(123456)
-				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-				.setTypeTimestampMicros(123456L)
+				.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+				.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 				// 20.00
 				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				// 20.00
@@ -183,9 +184,9 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456L);
-			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
-			user.setTypeTimestampMicros(123456L);
+			user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+			user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 			// 20.00
 			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index c945d25..835dc94 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -29,9 +29,6 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -40,6 +37,10 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -270,9 +271,9 @@ public class EncoderDecoderTest {
 			ByteBuffer.wrap(b),
 			LocalDate.parse("2014-03-01"),
 			LocalTime.parse("12:12:12"),
-			123456L,
-			DateTime.parse("2014-03-01T12:12:12.321Z"),
-			123456L,
+			LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+			Instant.parse("2014-03-01T12:12:12.321Z"),
+			Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
 			new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
index 46419e9..2cd7c55 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -73,9 +73,6 @@ public class RegistryAvroDeserializationSchemaTest {
 			Address.getClassSchema()));
 		assertEquals(address.getNum(), genericRecord.get("num"));
 		assertEquals(address.getStreet(), genericRecord.get("street").toString());
-		assertNull(genericRecord.get("city"));
-		assertNull(genericRecord.get("state"));
-		assertNull(genericRecord.get("zip"));
 		assertNull(genericRecord.get("country"));
 	}
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index ccba0a5..7618452 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -89,19 +89,30 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
-			"\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
-			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
-			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
-			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
-			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
-			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
-			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
-			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
-			"\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
-			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
-			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
-			"\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, " +
+			"\"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", " +
+			"\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", " +
+			"\"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, " +
+			"\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+			"\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, " +
+			"\"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+			"\"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n";
 	}
 
 	@Test
@@ -123,17 +134,29 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
-			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
-			"\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
-			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
-			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
-			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
-			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
-			"\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
-			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
-			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
-			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null," +
+			" \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null," +
+			" \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"]," +
+			" \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\"," +
+			" \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null," +
+			" \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\"," +
+			" \"state\": \"London\", \"zip\": \"NW1 6XE\"}," +
+			" \"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+			"\"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, " +
+			"\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+			"\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", " +
+			"\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, " +
+			"\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+			"\"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+			"\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
 			"\"type_decimal_fixed\": [7, -48]}\n";
 
 	}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 30e51bc..dcc0fb3 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -33,9 +33,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -44,6 +41,10 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -90,9 +91,9 @@ public final class AvroTestUtils {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			// byte array must contain the two's-complement representation of the
 			// unscaled integer value in big-endian byte order
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
@@ -121,11 +122,9 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, new Time(123)); // we truncate micros
+		rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		Timestamp timestampMicros = new Timestamp(0);
-		timestampMicros.setNanos(123_456_000);
-		rowUser.setField(20, timestampMicros);
+		rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
@@ -197,9 +196,9 @@ public final class AvroTestUtils {
 		user.put("type_bytes", ByteBuffer.allocate(10));
 		user.put("type_date", LocalDate.parse("2014-03-01"));
 		user.put("type_time_millis", LocalTime.parse("12:12:12"));
-		user.put("type_time_micros", 123456L);
-		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
-		user.put("type_timestamp_micros", 123456L);
+		user.put("type_time_micros", LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+		user.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z"));
+		user.put("type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
 		user.put("type_decimal_bytes",
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 		user.put("type_decimal_fixed",
@@ -226,11 +225,9 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, new Time(123)); // we truncate micros
+		rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		Timestamp timestampMicros = new Timestamp(0);
-		timestampMicros.setNanos(123_456_000);
-		rowUser.setField(20, timestampMicros);
+		rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index db0452c..aab530b 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -25,12 +25,12 @@ import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.generated.User;
 
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,9 +61,9 @@ public class TestDataGenerator {
 				generateRandomBytes(rnd),
 				LocalDate.parse("2014-03-01"),
 				LocalTime.parse("12:12:12"),
-				123456L,
-				DateTime.parse("2014-03-01T12:12:12.321Z"),
-				123456L,
+				LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+				Instant.parse("2014-03-01T12:12:12.321Z"),
+				Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
 				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
 				new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 	}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 4d93f6e..c8c71d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
-import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,15 +35,16 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -86,9 +86,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -107,12 +107,13 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeMap(new HashMap<>())
 			.setTypeFixed(new Fixed16())
 			.setTypeUnion(null)
-			.setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeNested(null)
+			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -135,9 +136,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeBytes(ByteBuffer.allocate(10))
 			.setTypeDate(LocalDate.parse("2014-03-01"))
 			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
-			.setTypeTimeMicros(123456)
-			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
-			.setTypeTimestampMicros(123456L)
+			.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+			.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
 			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
@@ -151,9 +152,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 	@Test
 	public void testAvroToRow() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
-		env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
-		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance().useBlinkPlanner().build());
 
 		Table t = tEnv.fromDataStream(testData(env));
 		Table result = t.select($("*"));
@@ -166,19 +167,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 		String expected =
 			"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
 			"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
-			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," +
-			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,00:00:00.123456," +
+			"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null\n" +
 			"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
 			// TODO we should get an Avro record here instead of a nested row. This should be fixed
 			// with FLIP-136
-			"Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
+			"Berlin,42,Berlin,Bakerstreet,12049,null,null,00:00:00.123456,12:12:12,1970-01-01T00:00:00.123456Z," +
 			"2014-03-01T12:12:12.321Z,null\n" +
 			"yellow,null,Terminator,[false],[world],false," +
 			"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
 			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
-			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," +
-			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,00:00:00.123456," +
+			"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null";
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 4d59e5b..aa7f638 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -102,7 +102,7 @@
 },
   {"namespace": "org.apache.flink.formats.avro.generated",
    "type": "record",
-   "name": "JodaTimeRecord",
+   "name": "LogicalTimeRecord",
    "fields": [
        {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
        {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
diff --git a/pom.xml b/pom.xml
index 7afb7a9..1ad4a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@ under the License.
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.10.1</jackson.version>
 		<prometheus.version>0.8.1</prometheus.version>
-		<avro.version>1.8.2</avro.version>
+		<avro.version>1.10.0</avro.version>
 		<javax.activation.api.version>1.2.0</javax.activation.api.version>
 		<jaxb.api.version>2.3.1</jaxb.api.version>
 		<junit.version>4.12</junit.version>
@@ -468,7 +468,8 @@ under the License.
 				<version>3.4.0</version>
 			</dependency>
 
-			<!-- Make sure we use a consistent avro version between Flink and Hadoop -->
+			<!-- We no longer align the avro version with the version bundled in Hadoop.
+			 Users might need to downgrade the avro version for a particular Hadoop version. -->
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro</artifactId>