You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:28 UTC

[flink] 04/07: [hotfix] Fix time-micros and timestamp-micros handling

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ac804852237aa36a0477757a3cdacd9b69e7dc5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 18:13:53 2020 +0200

    [hotfix] Fix time-micros and timestamp-micros handling
---
 .../formats/avro/AvroRowDeserializationSchema.java | 24 +++++++++++++---
 .../formats/avro/AvroRowSerializationSchema.java   | 32 ++++++++++++++++++----
 .../avro/typeutils/AvroSchemaConverter.java        |  2 ++
 .../flink/formats/avro/AvroOutputFormatITCase.java |  2 +-
 .../flink/formats/avro/AvroOutputFormatTest.java   |  2 +-
 .../formats/avro/AvroRecordInputFormatTest.java    |  2 +-
 .../avro/AvroSplittableInputFormatTest.java        |  4 +--
 .../flink/formats/avro/EncoderDecoderTest.java     |  2 +-
 .../avro/typeutils/AvroSchemaConverterTest.java    |  6 ++--
 .../flink/formats/avro/utils/AvroTestUtils.java    | 16 +++++++----
 .../formats/avro/utils/TestDataGenerator.java      |  2 +-
 .../flink-avro/src/test/resources/avro/user.avsc   |  2 +-
 12 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index fb7a74e..5453f67 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -75,12 +75,13 @@ import java.util.TimeZone;
  */
 @PublicEvolving
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
-
 	/**
 	 * Used for time conversions into SQL types.
 	 */
 	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
 
+	private static final long MICROS_PER_SECOND = 1_000_000L;
+
 	/**
 	 * Avro record class for deserialization. Might be null if record class is not available.
 	 */
@@ -294,7 +295,9 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 				return object;
 			case LONG:
 				if (info == Types.SQL_TIMESTAMP) {
-					return convertToTimestamp(object);
+					return convertToTimestamp(object, schema.getLogicalType() == LogicalTypes.timestampMicros());
+				} else if (info == Types.SQL_TIME) {
+					return convertToTime(object);
 				}
 				return object;
 			case FLOAT:
@@ -329,6 +332,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (object instanceof Long) {
+			millis = (Long) object / 1000L;
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTime(object);
 		} else {
@@ -337,10 +342,21 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
 
-	private Timestamp convertToTimestamp(Object object) {
+	private Timestamp convertToTimestamp(Object object, boolean isMicros) {
 		final long millis;
 		if (object instanceof Long) {
-			millis = (Long) object;
+			if (isMicros) {
+				long micros = (Long) object;
+				int offsetMillis = LOCAL_TZ.getOffset(micros / 1000L);
+
+				long seconds = micros / MICROS_PER_SECOND - offsetMillis / 1000;
+				int nanos = ((int) (micros % MICROS_PER_SECOND)) * 1000 - offsetMillis % 1000 * 1000;
+				Timestamp timestamp = new Timestamp(seconds * 1000L);
+				timestamp.setNanos(nanos);
+				return timestamp;
+			} else {
+				millis = (Long) object;
+			}
 		} else if (jodaConverter != null) {
 			millis = jodaConverter.convertTimestamp(object);
 		} else {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 30f9754..7503e8e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -248,9 +248,9 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 				} else if (object instanceof LocalDate) {
 					return convertFromDate(schema, Date.valueOf((LocalDate) object));
 				} else if (object instanceof Time) {
-					return convertFromTime(schema, (Time) object);
+					return convertFromTimeMillis(schema, (Time) object);
 				} else if (object instanceof LocalTime) {
-					return convertFromTime(schema, Time.valueOf((LocalTime) object));
+					return convertFromTimeMillis(schema, Time.valueOf((LocalTime) object));
 				}
 				return object;
 			case LONG:
@@ -259,6 +259,8 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 					return convertFromTimestamp(schema, (Timestamp) object);
 				} else if (object instanceof LocalDateTime) {
 					return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object));
+				} else if (object instanceof Time) {
+					return convertFromTimeMicros(schema, (Time) object);
 				}
 				return object;
 			case FLOAT:
@@ -295,29 +297,49 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 		}
 	}
 
-	private int convertFromTime(Schema schema, Time date) {
+	private int convertFromTimeMillis(Schema schema, Time date) {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.timeMillis()) {
 			// adopted from Apache Calcite
-			final long time = date.getTime();
-			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			final long converted = toEpochMillis(date);
 			return (int) (converted % 86400000L);
 		} else {
 			throw new RuntimeException("Unsupported time type.");
 		}
 	}
 
+	private long convertFromTimeMicros(Schema schema, Time date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timeMicros()) {
+			// adopted from Apache Calcite
+			final long converted = toEpochMillis(date);
+			return (converted % 86400000L) * 1000L;
+		} else {
+			throw new RuntimeException("Unsupported time type.");
+		}
+	}
+
 	private long convertFromTimestamp(Schema schema, Timestamp date) {
 		final LogicalType logicalType = schema.getLogicalType();
 		if (logicalType == LogicalTypes.timestampMillis()) {
 			// adopted from Apache Calcite
 			final long time = date.getTime();
 			return time + (long) LOCAL_TZ.getOffset(time);
+		} else if (logicalType == LogicalTypes.timestampMicros()) {
+			long millis = date.getTime();
+			long micros = millis * 1000 + (date.getNanos() % 1_000_000 / 1000);
+			long offset = LOCAL_TZ.getOffset(millis) * 1000L;
+			return micros + offset;
 		} else {
 			throw new RuntimeException("Unsupported timestamp type.");
 		}
 	}
 
+	private long toEpochMillis(java.util.Date date) {
+		final long time = date.getTime();
+		return time + (long) LOCAL_TZ.getOffset(time);
+	}
+
 	private void writeObject(ObjectOutputStream outputStream) throws IOException {
 		outputStream.writeObject(recordClazz);
 		outputStream.writeObject(schemaString); // support for null
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 332d18e..e63a623 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
 				// logical timestamp type
 				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
 					return Types.SQL_TIMESTAMP;
+				} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+					return Types.SQL_TIMESTAMP;
 				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
 					return Types.SQL_TIME;
 				}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index dd901d0..c0ef02c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -158,7 +158,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index f07c36b..47535e9 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -172,7 +172,7 @@ public class AvroOutputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 84849a6..ebb584d 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -139,7 +139,7 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimeMicros(123456L);
 		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user1.setTypeTimestampMicros(123456L);
 		// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index fee81a8..e78b3b2 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -117,7 +117,7 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeBytes(ByteBuffer.allocate(10));
 		user1.setTypeDate(LocalDate.parse("2014-03-01"));
 		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimeMicros(123456L);
 		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user1.setTypeTimestampMicros(123456L);
 		// 20.00
@@ -183,7 +183,7 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeBytes(ByteBuffer.allocate(10));
 			user.setTypeDate(LocalDate.parse("2014-03-01"));
 			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
-			user.setTypeTimeMicros(123456);
+			user.setTypeTimeMicros(123456L);
 			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
 			user.setTypeTimestampMicros(123456L);
 			// 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 49ef985..c945d25 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -270,7 +270,7 @@ public class EncoderDecoderTest {
 			ByteBuffer.wrap(b),
 			LocalDate.parse("2014-03-01"),
 			LocalTime.parse("12:12:12"),
-			123456,
+			123456L,
 			DateTime.parse("2014-03-01T12:12:12.321Z"),
 			123456L,
 			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 98efd6e..c1cada7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -151,9 +151,9 @@ public class AvroSchemaConverterTest {
 			Types.PRIMITIVE_ARRAY(Types.BYTE),
 			Types.SQL_DATE,
 			Types.SQL_TIME,
-			Types.INT,
+			Types.SQL_TIME,
+			Types.SQL_TIMESTAMP,
 			Types.SQL_TIMESTAMP,
-			Types.LONG,
 			Types.BIG_DEC,
 			Types.BIG_DEC);
 
@@ -193,7 +193,7 @@ public class AvroSchemaConverterTest {
 				DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
 				DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
 				DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
-				DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
+				DataTypes.FIELD("type_time_micros", DataTypes.TIME(6).notNull()),
 				DataTypes.FIELD("type_timestamp_millis",
 						DataTypes.TIMESTAMP(3).notNull()),
 				DataTypes.FIELD("type_timestamp_micros",
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 9d77f32..30e51bc 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -121,9 +121,11 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, 123456);
+		rowUser.setField(18, new Time(123)); // we truncate micros
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		rowUser.setField(20, 123456L);
+		Timestamp timestampMicros = new Timestamp(0);
+		timestampMicros.setNanos(123_456_000);
+		rowUser.setField(20, timestampMicros);
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
@@ -156,7 +158,7 @@ public final class AvroTestUtils {
 			"{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
 			"\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
 			"{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
-			"\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+			"\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
 			"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
 			"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
 			"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
@@ -195,7 +197,7 @@ public final class AvroTestUtils {
 		user.put("type_bytes", ByteBuffer.allocate(10));
 		user.put("type_date", LocalDate.parse("2014-03-01"));
 		user.put("type_time_millis", LocalTime.parse("12:12:12"));
-		user.put("type_time_micros", 123456);
+		user.put("type_time_micros", 123456L);
 		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
 		user.put("type_timestamp_micros", 123456L);
 		user.put("type_decimal_bytes",
@@ -224,9 +226,11 @@ public final class AvroTestUtils {
 		rowUser.setField(15, new byte[10]);
 		rowUser.setField(16, Date.valueOf("2014-03-01"));
 		rowUser.setField(17, Time.valueOf("12:12:12"));
-		rowUser.setField(18, 123456);
+		rowUser.setField(18, new Time(123)); // we truncate micros
 		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
-		rowUser.setField(20, 123456L);
+		Timestamp timestampMicros = new Timestamp(0);
+		timestampMicros.setNanos(123_456_000);
+		rowUser.setField(20, timestampMicros);
 		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
 		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index a4c5bf8..db0452c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -61,7 +61,7 @@ public class TestDataGenerator {
 				generateRandomBytes(rnd),
 				LocalDate.parse("2014-03-01"),
 				LocalTime.parse("12:12:12"),
-				123456,
+				123456L,
 				DateTime.parse("2014-03-01T12:12:12.321Z"),
 				123456L,
 				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index cb9e9b2..4d59e5b 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -34,7 +34,7 @@
      {"name": "type_bytes", "type": "bytes"},
      {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
      {"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}},
-     {"name": "type_time_micros", "type": {"type": "int", "logicalType": "time-micros"}},
+     {"name": "type_time_micros", "type": {"type": "long", "logicalType": "time-micros"}},
      {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
      {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
      {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},