You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:30 UTC
[flink] 06/07: [FLINK-18192] Upgrade avro to 1.10
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 115f85220abb89f3b9d179e0ea2e3b6a3be4c7af
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:28:25 2020 +0200
[FLINK-18192] Upgrade avro to 1.10
This commit upgrades the default version of avro that flink-avro will use. It should be possible to downgrade the avro version in a user job as the binary format is compatible and we do not expose any dependencies on avro in the API.
Additionally this commit fixes handling of logical types: time-micros and timestamp-micros as well as interpretation of timestamp-millis in the AvroRowDataDeserializationSchema.
---
flink-connectors/flink-connector-hive/pom.xml | 12 ++++
.../src/main/avro/ComplexPayloadAvro.avsc | 2 +-
.../RegistryAvroRowDataSeDeSchemaTest.java | 2 +-
.../formats/avro/AvroRowDeserializationSchema.java | 18 ++++++
.../formats/avro/AvroRowSerializationSchema.java | 3 +-
.../formats/avro/AvroToRowDataConverters.java | 17 ++++--
.../formats/avro/RowDataToAvroConverters.java | 2 +-
.../avro/typeutils/AvroSchemaConverter.java | 5 +-
.../avro/utils/AvroKryoSerializerUtils.java | 70 ---------------------
.../flink/formats/avro/AvroOutputFormatITCase.java | 13 ++--
.../flink/formats/avro/AvroOutputFormatTest.java | 14 +++--
.../formats/avro/AvroRecordInputFormatTest.java | 19 +++---
.../avro/AvroRowDataDeSerializationSchemaTest.java | 17 +++---
.../avro/AvroSplittableInputFormatTest.java | 25 ++++----
.../flink/formats/avro/EncoderDecoderTest.java | 13 ++--
.../RegistryAvroDeserializationSchemaTest.java | 3 -
.../avro/typeutils/AvroTypeExtractionTest.java | 71 ++++++++++++++--------
.../flink/formats/avro/utils/AvroTestUtils.java | 31 +++++-----
.../formats/avro/utils/TestDataGenerator.java | 14 ++---
.../flink/table/runtime/batch/AvroTypesITCase.java | 45 +++++++-------
.../flink-avro/src/test/resources/avro/user.avsc | 2 +-
pom.xml | 5 +-
22 files changed, 196 insertions(+), 207 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 8b38b97..e0c9b986 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -39,6 +39,7 @@ under the License.
<hiverunner.version>4.0.0</hiverunner.version>
<reflections.version>0.9.8</reflections.version>
<derby.version>10.10.2.0</derby.version>
+ <hive.avro.version>1.8.2</hive.avro.version>
</properties>
<!-- Overwrite hadoop dependency management from flink-parent to use locally defined Hadoop version -->
@@ -202,6 +203,17 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <!--We downgrade the version of avro for tests. We do that because we test Hive's built-in
+ formats, which do not work with newer avro versions. We do not downgrade the version
+ permanently, because it should be safe to use hive as a catalog with newer avro
+ versions. -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${hive.avro.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
index 15a801e..445f5af 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
@@ -22,7 +22,7 @@
{
"name": "eventTime",
"type": "long",
- "default": ""
+ "default": -1
},
{
"name": "stringList",
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
index 1cf348a..dbcec71 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
@@ -148,7 +148,7 @@ public class RegistryAvroRowDataSeDeSchemaTest {
byte[] serialized = serializer.serialize(oriData);
RowData rowData = deserializer.deserialize(serialized);
assertThat(rowData.getArity(), equalTo(schema.getFields().size()));
- assertEquals(address.getNum(), Integer.valueOf(rowData.getInt(0)));
+ assertEquals(address.getNum(), rowData.getInt(0));
assertEquals(address.getStreet(), rowData.getString(1).toString());
if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
assertEquals(address.getCity(), rowData.getString(2).toString());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5453f67..65d9ede 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -55,6 +55,10 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -320,6 +324,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
// adopted from Apache Calcite
final long t = (long) value * 86400000L;
millis = t - (long) LOCAL_TZ.getOffset(t);
+ } else if (object instanceof LocalDate) {
+ long t = ((LocalDate) object).toEpochDay() * 86400000L;
+ millis = t - (long) LOCAL_TZ.getOffset(t);
} else if (jodaConverter != null) {
millis = jodaConverter.convertDate(object);
} else {
@@ -334,6 +341,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
millis = (Integer) object;
} else if (object instanceof Long) {
millis = (Long) object / 1000L;
+ } else if (object instanceof LocalTime) {
+ millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
} else if (jodaConverter != null) {
millis = jodaConverter.convertTime(object);
} else {
@@ -357,6 +366,15 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
} else {
millis = (Long) object;
}
+ } else if (object instanceof Instant) {
+ Instant instant = (Instant) object;
+ int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli());
+
+ long seconds = instant.getEpochSecond() - offsetMillis / 1000;
+ int nanos = instant.getNano() - offsetMillis % 1000 * 1000;
+ Timestamp timestamp = new Timestamp(seconds * 1000L);
+ timestamp.setNanos(nanos);
+ return timestamp;
} else if (jodaConverter != null) {
millis = jodaConverter.convertTimestamp(object);
} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 7503e8e..6bc6af1 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -289,8 +289,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
final LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.date()) {
// adopted from Apache Calcite
- final long time = date.getTime();
- final long converted = time + (long) LOCAL_TZ.getOffset(time);
+ final long converted = toEpochMillis(date);
return (int) (converted / 86400000L);
} else {
throw new RuntimeException("Unsupported date type.");
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index b340cba..346ccf6 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -40,7 +40,10 @@ import org.apache.avro.generic.IndexedRecord;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
-import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -193,6 +196,8 @@ public class AvroToRowDataConverters {
final long millis;
if (object instanceof Long) {
millis = (Long) object;
+ } else if (object instanceof Instant) {
+ millis = ((Instant) object).toEpochMilli();
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
@@ -202,12 +207,14 @@ public class AvroToRowDataConverters {
"Unexpected object type for TIMESTAMP logical type. Received: " + object);
}
}
- return toTimestampData(millis);
+ return TimestampData.fromEpochMillis(millis);
}
private static int convertToDate(Object object) {
if (object instanceof Integer) {
return (Integer) object;
+ } else if (object instanceof LocalDate) {
+ return (int) ((LocalDate) object).toEpochDay();
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
@@ -218,14 +225,12 @@ public class AvroToRowDataConverters {
}
}
- private static TimestampData toTimestampData(long timeZoneMills) {
- return TimestampData.fromTimestamp(new Timestamp(timeZoneMills));
- }
-
private static int convertToTime(Object object) {
final int millis;
if (object instanceof Integer) {
millis = (Integer) object;
+ } else if (object instanceof LocalTime) {
+ millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index 0afb2d26..278956b 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -119,7 +119,7 @@ public class RowDataToAvroConverters {
converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- converter = (schema, object) -> ((TimestampData) object).toTimestamp().getTime();
+ converter = (schema, object) -> ((TimestampData) object).toInstant().toEpochMilli();
break;
case DECIMAL:
converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index e63a623..d1438eb 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -155,9 +155,8 @@ public class AvroSchemaConverter {
return Types.INT;
case LONG:
// logical timestamp type
- if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
- return Types.SQL_TIMESTAMP;
- } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis() ||
+ schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return Types.SQL_TIMESTAMP;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return Types.SQL_TIME;
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index ba125c2..5744abc 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -33,11 +33,6 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
-import org.joda.time.Chronology;
-import org.joda.time.DateTimeZone;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-import org.joda.time.chrono.ISOChronology;
import java.io.Serializable;
import java.util.LinkedHashMap;
@@ -108,69 +103,4 @@ public class AvroKryoSerializerUtils extends AvroUtils {
return sParser.parse(schemaAsString);
}
}
-
- /**
- * Avro logical types use JodaTime's LocalDate but Kryo is unable to serialize it
- * properly (esp. visible after calling the toString() method).
- */
- public static class JodaLocalDateSerializer extends Serializer<LocalDate> {
-
- public JodaLocalDateSerializer() {
- setImmutable(true);
- }
-
- @Override
- public void write(Kryo kryo, Output output, LocalDate localDate) {
- output.writeInt(localDate.getYear());
- output.writeInt(localDate.getMonthOfYear());
- output.writeInt(localDate.getDayOfMonth());
-
- final Chronology chronology = localDate.getChronology();
- if (chronology != null && !chronology.equals(ISOChronology.getInstanceUTC())) {
- throw new RuntimeException("Unsupported chronology: " + chronology);
- }
- }
-
- @Override
- public LocalDate read(Kryo kryo, Input input, Class<LocalDate> aClass) {
- final int y = input.readInt();
- final int m = input.readInt();
- final int d = input.readInt();
-
- return new LocalDate(
- y,
- m,
- d,
- null);
- }
- }
-
- /**
- * Avro logical types use JodaTime's LocalTime but Kryo is unable to serialize it
- * properly (esp. visible after calling the toString() method).
- */
- public static class JodaLocalTimeSerializer extends Serializer<LocalTime> {
-
- @Override
- public void write(Kryo kryo, Output output, LocalTime object) {
- final int time = object.getMillisOfDay();
- output.writeInt(time, true);
-
- final Chronology chronology = object.getChronology();
- if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
- throw new RuntimeException("Unsupported chronology: " + chronology);
- }
- }
-
- @Override
- public LocalTime read(Kryo kryo, Input input, Class<LocalTime> type) {
- final int time = input.readInt(true);
- return new LocalTime(time, ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
- }
-
- @Override
- public LocalTime copy(Kryo kryo, LocalTime original) {
- return new LocalTime(original);
- }
- }
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index c0ef02c..6d97b54 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -32,14 +32,15 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.Assert;
import java.io.File;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -158,9 +159,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456L);
- user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
- user.setTypeTimestampMicros(123456L);
+ user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+ user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index 47535e9..2b843fb 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -44,6 +41,10 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
@@ -172,9 +173,10 @@ public class AvroOutputFormatTest {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456L);
- user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
- user.setTypeTimestampMicros(123456L);
+ user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+ user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+
// 20.00
user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index ebb584d..319ff34 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -48,9 +48,6 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,6 +58,10 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -139,9 +140,9 @@ public class AvroRecordInputFormatTest {
user1.setTypeBytes(ByteBuffer.allocate(10));
user1.setTypeDate(LocalDate.parse("2014-03-01"));
user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user1.setTypeTimeMicros(123456L);
- user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
- user1.setTypeTimestampMicros(123456L);
+ user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+ user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// 20.00
@@ -170,9 +171,9 @@ public class AvroRecordInputFormatTest {
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// 20.00
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 89fcceb..370396f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.formats.avro;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.generated.JodaTimeRecord;
+import org.apache.flink.formats.avro.generated.LogicalTimeRecord;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -35,15 +35,15 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -164,11 +164,12 @@ public class AvroRowDataDeSerializationSchemaTest {
@Test
public void testSpecificType() throws Exception {
- JodaTimeRecord record = new JodaTimeRecord();
- record.setTypeTimestampMillis(DateTime.parse("2010-06-30T01:20:20"));
+ LogicalTimeRecord record = new LogicalTimeRecord();
+ Instant timestamp = Instant.parse("2010-06-30T01:20:20Z");
+ record.setTypeTimestampMillis(timestamp);
record.setTypeDate(LocalDate.parse("2014-03-01"));
record.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- SpecificDatumWriter<JodaTimeRecord> datumWriter = new SpecificDatumWriter<>(JodaTimeRecord.class);
+ SpecificDatumWriter<LogicalTimeRecord> datumWriter = new SpecificDatumWriter<>(LogicalTimeRecord.class);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
datumWriter.write(record, encoder);
@@ -191,7 +192,7 @@ public class AvroRowDataDeSerializationSchemaTest {
byte[] output = serializationSchema.serialize(rowData);
RowData rowData2 = deserializationSchema.deserialize(output);
Assert.assertEquals(rowData, rowData2);
- Assert.assertEquals("2010-06-30T01:20:20", rowData.getTimestamp(0, 3).toString());
+ Assert.assertEquals(timestamp, rowData.getTimestamp(0, 3).toInstant());
Assert.assertEquals("2014-03-01", DataFormatConverters.LocalDateConverter.INSTANCE.toExternal(
rowData.getInt(1)).toString());
Assert.assertEquals("12:12:12", DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index e78b3b2..39b5db6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -31,9 +31,6 @@ import org.apache.flink.formats.avro.generated.User;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -43,6 +40,10 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
@@ -117,9 +118,9 @@ public class AvroSplittableInputFormatTest {
user1.setTypeBytes(ByteBuffer.allocate(10));
user1.setTypeDate(LocalDate.parse("2014-03-01"));
user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user1.setTypeTimeMicros(123456L);
- user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
- user1.setTypeTimestampMicros(123456L);
+ user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+ user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// 20.00
@@ -148,9 +149,9 @@ public class AvroSplittableInputFormatTest {
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// 20.00
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// 20.00
@@ -183,9 +184,9 @@ public class AvroSplittableInputFormatTest {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456L);
- user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
- user.setTypeTimestampMicros(123456L);
+ user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
+ user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index c945d25..835dc94 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -29,9 +29,6 @@ import org.apache.flink.util.StringUtils;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -40,6 +37,10 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -270,9 +271,9 @@ public class EncoderDecoderTest {
ByteBuffer.wrap(b),
LocalDate.parse("2014-03-01"),
LocalTime.parse("12:12:12"),
- 123456L,
- DateTime.parse("2014-03-01T12:12:12.321Z"),
- 123456L,
+ LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+ Instant.parse("2014-03-01T12:12:12.321Z"),
+ Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
index 46419e9..2cd7c55 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -73,9 +73,6 @@ public class RegistryAvroDeserializationSchemaTest {
Address.getClassSchema()));
assertEquals(address.getNum(), genericRecord.get("num"));
assertEquals(address.getStreet(), genericRecord.get("street").toString());
- assertNull(genericRecord.get("city"));
- assertNull(genericRecord.get("state"));
- assertNull(genericRecord.get("zip"));
assertNull(genericRecord.get("country"));
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index ccba0a5..7618452 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -89,19 +89,30 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.execute("Simple Avro read job");
- expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
- "\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
- "\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
- "\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
- "\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
- "\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
- "\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
- "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
- "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
- "\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
- "\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
- "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
- "\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, " +
+ "\"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+ "\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+ "\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", " +
+ "\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, " +
+ "\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", " +
+ "\"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+ "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+ "\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+ "\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+ "\"type_decimal_fixed\": [7, -48]}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, " +
+ "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+ "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+ "\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, " +
+ "\"type_fixed\": null, \"type_union\": null, " +
+ "\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+ "\"zip\": \"NW1 6XE\"}, " +
+ "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+ "\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+ "\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+ "\"type_decimal_fixed\": [7, -48]}\n";
}
@Test
@@ -123,17 +134,29 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.execute("Simple Avro read job");
- expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
- "\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
- "\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
- "\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
- "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
- "\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
- "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
- "\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
- "\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
- "\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
- "\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null," +
+ " \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null," +
+ " \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"]," +
+ " \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\"," +
+ " \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null," +
+ " \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\"," +
+ " \"state\": \"London\", \"zip\": \"NW1 6XE\"}," +
+ " \"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+ "\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+ "\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
+ "\"type_decimal_fixed\": [7, -48]}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, " +
+ "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+ "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " +
+ "\"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", " +
+ "\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, " +
+ "\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", " +
+ "\"zip\": \"NW1 6XE\"}, " +
+ "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12, \"type_time_micros\": 00:00:00.123456, " +
+ "\"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+ "\"type_timestamp_micros\": 1970-01-01T00:00:00.123456Z, \"type_decimal_bytes\": \"\\u0007Ð\", " +
"\"type_decimal_fixed\": [7, -48]}\n";
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 30e51bc..dcc0fb3 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -33,9 +33,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -44,6 +41,10 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
@@ -90,9 +91,9 @@ public final class AvroTestUtils {
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// byte array must contain the two's-complement representation of the
// unscaled integer value in big-endian byte order
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
@@ -121,11 +122,9 @@ public final class AvroTestUtils {
rowUser.setField(15, new byte[10]);
rowUser.setField(16, Date.valueOf("2014-03-01"));
rowUser.setField(17, Time.valueOf("12:12:12"));
- rowUser.setField(18, new Time(123)); // we truncate micros
+ rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
- Timestamp timestampMicros = new Timestamp(0);
- timestampMicros.setNanos(123_456_000);
- rowUser.setField(20, timestampMicros);
+ rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
rowUser.setField(21, BigDecimal.valueOf(2000, 2));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
@@ -197,9 +196,9 @@ public final class AvroTestUtils {
user.put("type_bytes", ByteBuffer.allocate(10));
user.put("type_date", LocalDate.parse("2014-03-01"));
user.put("type_time_millis", LocalTime.parse("12:12:12"));
- user.put("type_time_micros", 123456L);
- user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
- user.put("type_timestamp_micros", 123456L);
+ user.put("type_time_micros", LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
+ user.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z"));
+ user.put("type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
user.put("type_decimal_bytes",
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.put("type_decimal_fixed",
@@ -226,11 +225,9 @@ public final class AvroTestUtils {
rowUser.setField(15, new byte[10]);
rowUser.setField(16, Date.valueOf("2014-03-01"));
rowUser.setField(17, Time.valueOf("12:12:12"));
- rowUser.setField(18, new Time(123)); // we truncate micros
+ rowUser.setField(18, Time.valueOf(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)));
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
- Timestamp timestampMicros = new Timestamp(0);
- timestampMicros.setNanos(123_456_000);
- rowUser.setField(20, timestampMicros);
+ rowUser.setField(20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
rowUser.setField(21, BigDecimal.valueOf(2000, 2));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index db0452c..aab530b 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -25,12 +25,12 @@ import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.SimpleUser;
import org.apache.flink.formats.avro.generated.User;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -61,9 +61,9 @@ public class TestDataGenerator {
generateRandomBytes(rnd),
LocalDate.parse("2014-03-01"),
LocalTime.parse("12:12:12"),
- 123456L,
- DateTime.parse("2014-03-01T12:12:12.321Z"),
- 123456L,
+ LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
+ Instant.parse("2014-03-01T12:12:12.321Z"),
+ Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 4d93f6e..c8c71d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
-import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,15 +35,16 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.avro.util.Utf8;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -86,9 +86,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
@@ -107,12 +107,13 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
.setTypeMap(new HashMap<>())
.setTypeFixed(new Fixed16())
.setTypeUnion(null)
- .setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
+ .setTypeNested(null)
+ .setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
@@ -135,9 +136,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
- .setTypeTimeMicros(123456)
- .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
- .setTypeTimestampMicros(123456L)
+ .setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
@@ -151,9 +152,9 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
@Test
public void testAvroToRow() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
- env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(
+ env,
+ EnvironmentSettings.newInstance().useBlinkPlanner().build());
Table t = tEnv.fromDataStream(testData(env));
Table result = t.select($("*"));
@@ -166,19 +167,19 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
String expected =
"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
- "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," +
- "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,00:00:00.123456," +
+ "12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null\n" +
"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
// TODO we should get an Avro record here instead of a nested row. This should be fixed
// with FLIP-136
- "Berlin,42,Berlin,Bakerstreet,12049,null,null,123456,12:12:12.000,123456," +
+ "Berlin,42,Berlin,Bakerstreet,12049,null,null,00:00:00.123456,12:12:12,1970-01-01T00:00:00.123456Z," +
"2014-03-01T12:12:12.321Z,null\n" +
"yellow,null,Terminator,[false],[world],false," +
"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
- "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," +
- "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,00:00:00.123456," +
+ "12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null";
TestBaseUtils.compareResultAsText(results, expected);
}
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 4d59e5b..aa7f638 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -102,7 +102,7 @@
},
{"namespace": "org.apache.flink.formats.avro.generated",
"type": "record",
- "name": "JodaTimeRecord",
+ "name": "LogicalTimeRecord",
"fields": [
{"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
diff --git a/pom.xml b/pom.xml
index 7afb7a9..1ad4a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@ under the License.
<curator.version>2.12.0</curator.version>
<jackson.version>2.10.1</jackson.version>
<prometheus.version>0.8.1</prometheus.version>
- <avro.version>1.8.2</avro.version>
+ <avro.version>1.10.0</avro.version>
<javax.activation.api.version>1.2.0</javax.activation.api.version>
<jaxb.api.version>2.3.1</jaxb.api.version>
<junit.version>4.12</junit.version>
@@ -468,7 +468,8 @@ under the License.
<version>3.4.0</version>
</dependency>
- <!-- Make sure we use a consistent avro version between Flink and Hadoop -->
+ <!-- We no longer align the avro version with the version bundled in Hadoop.
+ Users might need to downgrade the avro version for a particular Hadoop version. -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>