You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:28 UTC
[flink] 04/07: [hotfix] Fix time-micros and timestamp-micros
handling
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6ac804852237aa36a0477757a3cdacd9b69e7dc5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 18:13:53 2020 +0200
[hotfix] Fix time-micros and timestamp-micros handling
---
.../formats/avro/AvroRowDeserializationSchema.java | 24 +++++++++++++---
.../formats/avro/AvroRowSerializationSchema.java | 32 ++++++++++++++++++----
.../avro/typeutils/AvroSchemaConverter.java | 2 ++
.../flink/formats/avro/AvroOutputFormatITCase.java | 2 +-
.../flink/formats/avro/AvroOutputFormatTest.java | 2 +-
.../formats/avro/AvroRecordInputFormatTest.java | 2 +-
.../avro/AvroSplittableInputFormatTest.java | 4 +--
.../flink/formats/avro/EncoderDecoderTest.java | 2 +-
.../avro/typeutils/AvroSchemaConverterTest.java | 6 ++--
.../flink/formats/avro/utils/AvroTestUtils.java | 16 +++++++----
.../formats/avro/utils/TestDataGenerator.java | 2 +-
.../flink-avro/src/test/resources/avro/user.avsc | 2 +-
12 files changed, 70 insertions(+), 26 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index fb7a74e..5453f67 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -75,12 +75,13 @@ import java.util.TimeZone;
*/
@PublicEvolving
public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
-
/**
* Used for time conversions into SQL types.
*/
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+ private static final long MICROS_PER_SECOND = 1_000_000L;
+
/**
* Avro record class for deserialization. Might be null if record class is not available.
*/
@@ -294,7 +295,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
return object;
case LONG:
if (info == Types.SQL_TIMESTAMP) {
- return convertToTimestamp(object);
+ return convertToTimestamp(object, schema.getLogicalType() == LogicalTypes.timestampMicros());
+ } else if (info == Types.SQL_TIME) {
+ return convertToTime(object);
}
return object;
case FLOAT:
@@ -329,6 +332,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
final long millis;
if (object instanceof Integer) {
millis = (Integer) object;
+ } else if (object instanceof Long) {
+ millis = (Long) object / 1000L;
} else if (jodaConverter != null) {
millis = jodaConverter.convertTime(object);
} else {
@@ -337,10 +342,21 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
return new Time(millis - LOCAL_TZ.getOffset(millis));
}
- private Timestamp convertToTimestamp(Object object) {
+ private Timestamp convertToTimestamp(Object object, boolean isMicros) {
final long millis;
if (object instanceof Long) {
- millis = (Long) object;
+ if (isMicros) {
+ long micros = (Long) object;
+ int offsetMillis = LOCAL_TZ.getOffset(micros / 1000L);
+
+ long seconds = micros / MICROS_PER_SECOND - offsetMillis / 1000;
+ int nanos = ((int) (micros % MICROS_PER_SECOND)) * 1000 - offsetMillis % 1000 * 1000;
+ Timestamp timestamp = new Timestamp(seconds * 1000L);
+ timestamp.setNanos(nanos);
+ return timestamp;
+ } else {
+ millis = (Long) object;
+ }
} else if (jodaConverter != null) {
millis = jodaConverter.convertTimestamp(object);
} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 30f9754..7503e8e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -248,9 +248,9 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
} else if (object instanceof LocalDate) {
return convertFromDate(schema, Date.valueOf((LocalDate) object));
} else if (object instanceof Time) {
- return convertFromTime(schema, (Time) object);
+ return convertFromTimeMillis(schema, (Time) object);
} else if (object instanceof LocalTime) {
- return convertFromTime(schema, Time.valueOf((LocalTime) object));
+ return convertFromTimeMillis(schema, Time.valueOf((LocalTime) object));
}
return object;
case LONG:
@@ -259,6 +259,8 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
return convertFromTimestamp(schema, (Timestamp) object);
} else if (object instanceof LocalDateTime) {
return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object));
+ } else if (object instanceof Time) {
+ return convertFromTimeMicros(schema, (Time) object);
}
return object;
case FLOAT:
@@ -295,29 +297,49 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
}
}
- private int convertFromTime(Schema schema, Time date) {
+ private int convertFromTimeMillis(Schema schema, Time date) {
final LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.timeMillis()) {
// adopted from Apache Calcite
- final long time = date.getTime();
- final long converted = time + (long) LOCAL_TZ.getOffset(time);
+ final long converted = toEpochMillis(date);
return (int) (converted % 86400000L);
} else {
throw new RuntimeException("Unsupported time type.");
}
}
+ private long convertFromTimeMicros(Schema schema, Time date) {
+ final LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == LogicalTypes.timeMicros()) {
+ // adopted from Apache Calcite
+ final long converted = toEpochMillis(date);
+ return (converted % 86400000L) * 1000L;
+ } else {
+ throw new RuntimeException("Unsupported time type.");
+ }
+ }
+
private long convertFromTimestamp(Schema schema, Timestamp date) {
final LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.timestampMillis()) {
// adopted from Apache Calcite
final long time = date.getTime();
return time + (long) LOCAL_TZ.getOffset(time);
+ } else if (logicalType == LogicalTypes.timestampMicros()) {
+ long millis = date.getTime();
+ long micros = millis * 1000 + (date.getNanos() % 1_000_000 / 1000);
+ long offset = LOCAL_TZ.getOffset(millis) * 1000L;
+ return micros + offset;
} else {
throw new RuntimeException("Unsupported timestamp type.");
}
}
+ private long toEpochMillis(java.util.Date date) {
+ final long time = date.getTime();
+ return time + (long) LOCAL_TZ.getOffset(time);
+ }
+
private void writeObject(ObjectOutputStream outputStream) throws IOException {
outputStream.writeObject(recordClazz);
outputStream.writeObject(schemaString); // support for null
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 332d18e..e63a623 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return Types.SQL_TIMESTAMP;
+ } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ return Types.SQL_TIMESTAMP;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return Types.SQL_TIME;
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index dd901d0..c0ef02c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -158,7 +158,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456);
+ user.setTypeTimeMicros(123456L);
user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(123456L);
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index f07c36b..47535e9 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -172,7 +172,7 @@ public class AvroOutputFormatTest {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456);
+ user.setTypeTimeMicros(123456L);
user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(123456L);
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 84849a6..ebb584d 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -139,7 +139,7 @@ public class AvroRecordInputFormatTest {
user1.setTypeBytes(ByteBuffer.allocate(10));
user1.setTypeDate(LocalDate.parse("2014-03-01"));
user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user1.setTypeTimeMicros(123456);
+ user1.setTypeTimeMicros(123456L);
user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(123456L);
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index fee81a8..e78b3b2 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -117,7 +117,7 @@ public class AvroSplittableInputFormatTest {
user1.setTypeBytes(ByteBuffer.allocate(10));
user1.setTypeDate(LocalDate.parse("2014-03-01"));
user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user1.setTypeTimeMicros(123456);
+ user1.setTypeTimeMicros(123456L);
user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(123456L);
// 20.00
@@ -183,7 +183,7 @@ public class AvroSplittableInputFormatTest {
user.setTypeBytes(ByteBuffer.allocate(10));
user.setTypeDate(LocalDate.parse("2014-03-01"));
user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
- user.setTypeTimeMicros(123456);
+ user.setTypeTimeMicros(123456L);
user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(123456L);
// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 49ef985..c945d25 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -270,7 +270,7 @@ public class EncoderDecoderTest {
ByteBuffer.wrap(b),
LocalDate.parse("2014-03-01"),
LocalTime.parse("12:12:12"),
- 123456,
+ 123456L,
DateTime.parse("2014-03-01T12:12:12.321Z"),
123456L,
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 98efd6e..c1cada7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -151,9 +151,9 @@ public class AvroSchemaConverterTest {
Types.PRIMITIVE_ARRAY(Types.BYTE),
Types.SQL_DATE,
Types.SQL_TIME,
- Types.INT,
+ Types.SQL_TIME,
+ Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP,
- Types.LONG,
Types.BIG_DEC,
Types.BIG_DEC);
@@ -193,7 +193,7 @@ public class AvroSchemaConverterTest {
DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
- DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
+ DataTypes.FIELD("type_time_micros", DataTypes.TIME(6).notNull()),
DataTypes.FIELD("type_timestamp_millis",
DataTypes.TIMESTAMP(3).notNull()),
DataTypes.FIELD("type_timestamp_micros",
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 9d77f32..30e51bc 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -121,9 +121,11 @@ public final class AvroTestUtils {
rowUser.setField(15, new byte[10]);
rowUser.setField(16, Date.valueOf("2014-03-01"));
rowUser.setField(17, Time.valueOf("12:12:12"));
- rowUser.setField(18, 123456);
+ rowUser.setField(18, new Time(123)); // we truncate micros
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
- rowUser.setField(20, 123456L);
+ Timestamp timestampMicros = new Timestamp(0);
+ timestampMicros.setNanos(123_456_000);
+ rowUser.setField(20, timestampMicros);
rowUser.setField(21, BigDecimal.valueOf(2000, 2));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
@@ -156,7 +158,7 @@ public final class AvroTestUtils {
"{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
"\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
"{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
- "\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+ "\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
@@ -195,7 +197,7 @@ public final class AvroTestUtils {
user.put("type_bytes", ByteBuffer.allocate(10));
user.put("type_date", LocalDate.parse("2014-03-01"));
user.put("type_time_millis", LocalTime.parse("12:12:12"));
- user.put("type_time_micros", 123456);
+ user.put("type_time_micros", 123456L);
user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
user.put("type_timestamp_micros", 123456L);
user.put("type_decimal_bytes",
@@ -224,9 +226,11 @@ public final class AvroTestUtils {
rowUser.setField(15, new byte[10]);
rowUser.setField(16, Date.valueOf("2014-03-01"));
rowUser.setField(17, Time.valueOf("12:12:12"));
- rowUser.setField(18, 123456);
+ rowUser.setField(18, new Time(123)); // we truncate micros
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
- rowUser.setField(20, 123456L);
+ Timestamp timestampMicros = new Timestamp(0);
+ timestampMicros.setNanos(123_456_000);
+ rowUser.setField(20, timestampMicros);
rowUser.setField(21, BigDecimal.valueOf(2000, 2));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index a4c5bf8..db0452c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -61,7 +61,7 @@ public class TestDataGenerator {
generateRandomBytes(rnd),
LocalDate.parse("2014-03-01"),
LocalTime.parse("12:12:12"),
- 123456,
+ 123456L,
DateTime.parse("2014-03-01T12:12:12.321Z"),
123456L,
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index cb9e9b2..4d59e5b 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -34,7 +34,7 @@
{"name": "type_bytes", "type": "bytes"},
{"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
{"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}},
- {"name": "type_time_micros", "type": {"type": "int", "logicalType": "time-micros"}},
+ {"name": "type_time_micros", "type": {"type": "long", "logicalType": "time-micros"}},
{"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
{"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},