You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by nk...@apache.org on 2019/07/08 14:44:05 UTC
[avro] branch master updated: AVRO-2328: Support distinguishing
between LocalDateTime and Instant semantics in timestamps (#525)
This is an automated email from the ASF dual-hosted git repository.
nkollar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new a2098bd AVRO-2328: Support distinguishing between LocalDateTime and Instant semantics in timestamps (#525)
a2098bd is described below
commit a2098bd88361b5ff9298c462a2dccafbb0c2c508
Author: Nándor Kollár <na...@users.noreply.github.com>
AuthorDate: Mon Jul 8 16:43:59 2019 +0200
AVRO-2328: Support distinguishing between LocalDateTime and Instant semantics in timestamps (#525)
New logical type for local timestamp semantic.
---
doc/src/content/xdocs/spec.xml | 24 ++++++
.../main/java/org/apache/avro/LogicalTypes.java | 48 ++++++++++++
.../java/org/apache/avro/data/TimeConversions.java | 64 ++++++++++++++++
.../avro/generic/TestGenericLogicalTypes.java | 89 +++++++++++++++++++++-
.../avro/reflect/TestReflectLogicalTypes.java | 36 +++++++++
.../javacc/org/apache/avro/compiler/idl/idl.jj | 3 +
.../compiler/src/test/idl/input/mr_events.avdl | 1 +
.../compiler/src/test/idl/output/mr_events.avpr | 3 +
8 files changed, 267 insertions(+), 1 deletion(-)
diff --git a/doc/src/content/xdocs/spec.xml b/doc/src/content/xdocs/spec.xml
index 7c01cd0..466cd73 100644
--- a/doc/src/content/xdocs/spec.xml
+++ b/doc/src/content/xdocs/spec.xml
@@ -1514,6 +1514,8 @@ void initFPTable() {
<title>Timestamp (millisecond precision)</title>
<p>
The <code>timestamp-millis</code> logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond.
+ Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation.
+ In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment.
</p>
<p>
A <code>timestamp-millis</code> logical type annotates an Avro <code>long</code>, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC.
@@ -1524,6 +1526,8 @@ void initFPTable() {
<title>Timestamp (microsecond precision)</title>
<p>
The <code>timestamp-micros</code> logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one microsecond.
+ Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation.
+ In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment.
</p>
<p>
A <code>timestamp-micros</code> logical type annotates an Avro <code>long</code>, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
@@ -1531,6 +1535,26 @@ void initFPTable() {
</section>
<section>
+ <title>Local timestamp (millisecond precision)</title>
+ <p>
+ The <code>local-timestamp-millis</code> logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one millisecond.
+ </p>
+ <p>
+ A <code>local-timestamp-millis</code> logical type annotates an Avro <code>long</code>, where the long stores the number of milliseconds, from 1 January 1970 00:00:00.000.
+ </p>
+ </section>
+
+ <section>
+ <title>Local timestamp (microsecond precision)</title>
+ <p>
+ The <code>local-timestamp-micros</code> logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one microsecond.
+ </p>
+ <p>
+ A <code>local-timestamp-micros</code> logical type annotates an Avro <code>long</code>, where the long stores the number of microseconds, from 1 January 1970 00:00:00.000000.
+ </p>
+ </section>
+
+ <section>
<title>Duration</title>
<p>
The <code>duration</code> logical type represents an amount of time defined by a number of months, days and milliseconds. This is not equivalent to a number of milliseconds, because, depending on the moment in time from which the duration is measured, the number of days in the month and number of milliseconds in a day may differ. Other standard periods such as years, quarters, hours and minutes can be expressed through these basic periods.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
index 84ba531..a297bc2 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
@@ -86,6 +86,12 @@ public class LogicalTypes {
case TIME_MICROS:
logicalType = TIME_MICROS_TYPE;
break;
+ case LOCAL_TIMESTAMP_MICROS:
+ logicalType = LOCAL_TIMESTAMP_MICROS_TYPE;
+ break;
+ case LOCAL_TIMESTAMP_MILLIS:
+ logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE;
+ break;
default:
final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
if (typeFactory != null) {
@@ -120,6 +126,8 @@ public class LogicalTypes {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
+ private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
+ private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
/** Create a Decimal LogicalType with the given precision and scale 0 */
public static Decimal decimal(int precision) {
@@ -167,6 +175,18 @@ public class LogicalTypes {
return TIMESTAMP_MICROS_TYPE;
}
+ private static final LocalTimestampMillis LOCAL_TIMESTAMP_MILLIS_TYPE = new LocalTimestampMillis();
+
+ public static LocalTimestampMillis localTimestampMillis() {
+ return LOCAL_TIMESTAMP_MILLIS_TYPE;
+ }
+
+ private static final LocalTimestampMicros LOCAL_TIMESTAMP_MICROS_TYPE = new LocalTimestampMicros();
+
+ public static LocalTimestampMicros localTimestampMicros() {
+ return LOCAL_TIMESTAMP_MICROS_TYPE;
+ }
+
/** Decimal represents arbitrary-precision fixed-scale decimal numbers */
public static class Decimal extends LogicalType {
private static final String PRECISION_PROP = "precision";
@@ -359,4 +379,32 @@ public class LogicalTypes {
}
}
+ public static class LocalTimestampMillis extends LogicalType {
+ private LocalTimestampMillis() {
+ super(LOCAL_TIMESTAMP_MILLIS);
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.LONG) {
+ throw new IllegalArgumentException("Local timestamp (millis) can only be used with an underlying long type");
+ }
+ }
+ }
+
+ public static class LocalTimestampMicros extends LogicalType {
+ private LocalTimestampMicros() {
+ super(LOCAL_TIMESTAMP_MICROS);
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.LONG) {
+ throw new IllegalArgumentException("Local timestamp (micros) can only be used with an underlying long type");
+ }
+ }
+ }
+
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java b/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
index 1351ccb..52ec426 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
@@ -25,7 +25,9 @@ import org.apache.avro.Schema;
import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
public class TimeConversions {
@@ -201,4 +203,66 @@ public class TimeConversions {
return LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
}
}
+
+ public static class LocalTimestampMillisConversion extends Conversion<LocalDateTime> {
+ private final TimestampMillisConversion timestampMillisConversion = new TimestampMillisConversion();
+
+ @Override
+ public Class<LocalDateTime> getConvertedType() {
+ return LocalDateTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "local-timestamp-millis";
+ }
+
+ @Override
+ public LocalDateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
+ Instant instant = timestampMillisConversion.fromLong(millisFromEpoch, schema, type);
+ return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+
+ @Override
+ public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
+ Instant instant = timestamp.toInstant(ZoneOffset.UTC);
+ return timestampMillisConversion.toLong(instant, schema, type);
+ }
+
+ @Override
+ public Schema getRecommendedSchema() {
+ return LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ }
+ }
+
+ public static class LocalTimestampMicrosConversion extends Conversion<LocalDateTime> {
+ private final TimestampMicrosConversion timestampMicrosConversion = new TimestampMicrosConversion();
+
+ @Override
+ public Class<LocalDateTime> getConvertedType() {
+ return LocalDateTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "local-timestamp-micros";
+ }
+
+ @Override
+ public LocalDateTime fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
+ Instant instant = timestampMicrosConversion.fromLong(microsFromEpoch, schema, type);
+ return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+
+ @Override
+ public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
+ Instant instant = timestamp.toInstant(ZoneOffset.UTC);
+ return timestampMicrosConversion.toLong(instant, schema, type);
+ }
+
+ @Override
+ public Schema getRecommendedSchema() {
+ return LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+ }
+ }
}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
index 0603bfe..61f0b05 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
@@ -22,9 +22,13 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.*;
import org.apache.avro.*;
+import org.apache.avro.data.TimeConversions;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
@@ -44,9 +48,11 @@ public class TestGenericLogicalTypes {
public static final GenericData GENERIC = new GenericData();
@BeforeClass
- public static void addDecimalAndUUID() {
+ public static void addLogicalTypes() {
GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+ GENERIC.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
+ GENERIC.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
}
@Test
@@ -268,4 +274,85 @@ public class TestGenericLogicalTypes {
Assert.assertEquals(original, copy);
}
+ @Test
+ public void testReadLocalTimestampMillis() throws IOException {
+ LogicalType timestamp = LogicalTypes.localTimestampMillis();
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+ LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 42000000);
+ LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);
+ List<LocalDateTime> expected = Arrays.asList(i1, i2);
+
+ Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMillisConversion();
+
+ // use the conversion directly instead of relying on the write side
+ Long i1long = conversion.toLong(i1, longSchema, timestamp);
+ Long i2long = 0L;
+
+ File test = write(longSchema, i1long, i2long);
+ Assert.assertEquals("Should convert long to LocalDateTime", expected,
+ read(GENERIC.createDatumReader(timestampSchema), test));
+ }
+
+ @Test
+ public void testWriteLocalTimestampMillis() throws IOException {
+ LogicalType timestamp = LogicalTypes.localTimestampMillis();
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+ LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 42000000);
+ LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);
+
+ Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMillisConversion();
+
+ Long d1long = conversion.toLong(i1, longSchema, timestamp);
+ Long d2long = 0L;
+ List<Long> expected = Arrays.asList(d1long, d2long);
+
+ File test = write(GENERIC, timestampSchema, i1, i2);
+ Assert.assertEquals("Should read LocalDateTime as longs", expected,
+ read(GenericData.get().createDatumReader(timestampSchema), test));
+ }
+
+ @Test
+ public void testReadLocalTimestampMicros() throws IOException {
+ LogicalType timestamp = LogicalTypes.localTimestampMicros();
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+ LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 420000);
+ LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, 4000), ZoneOffset.UTC);
+ List<LocalDateTime> expected = Arrays.asList(i1, i2);
+
+ Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMicrosConversion();
+
+ // use the conversion directly instead of relying on the write side
+ Long i1long = conversion.toLong(i1, longSchema, timestamp);
+ Long i2long = conversion.toLong(i2, longSchema, timestamp);
+
+ File test = write(longSchema, i1long, i2long);
+ Assert.assertEquals("Should convert long to LocalDateTime", expected,
+ read(GENERIC.createDatumReader(timestampSchema), test));
+ }
+
+ @Test
+ public void testWriteLocalTimestampMicros() throws IOException {
+ LogicalType timestamp = LogicalTypes.localTimestampMicros();
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+ LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 420000);
+ LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, 4000), ZoneOffset.UTC);
+
+ Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMicrosConversion();
+
+ Long d1long = conversion.toLong(i1, longSchema, timestamp);
+ Long d2long = conversion.toLong(i2, longSchema, timestamp);
+ List<Long> expected = Arrays.asList(d1long, d2long);
+
+ File test = write(GENERIC, timestampSchema, i1, i2);
+ Assert.assertEquals("Should read LocalDateTime as longs", expected,
+ read(GenericData.get().createDatumReader(timestampSchema), test));
+ }
}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
index a2b242d..066a801 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
@@ -21,9 +21,11 @@ package org.apache.avro.reflect;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
+import java.time.LocalDateTime;
import java.util.*;
import org.apache.avro.*;
+import org.apache.avro.data.TimeConversions;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
@@ -50,6 +52,7 @@ public class TestReflectLogicalTypes {
public static void addUUID() {
REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion());
REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ REFLECT.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
}
@Test
@@ -598,6 +601,18 @@ public class TestReflectLogicalTypes {
read(REFLECT.createDatumReader(stringArraySchema), test).get(0));
}
+ @Test
+ public void testReflectedSchemaLocalDateTime() {
+ Schema actual = REFLECT.getSchema(RecordWithTimestamps.class);
+
+ Assert.assertEquals("Should have the correct record name", "org.apache.avro.reflect", actual.getNamespace());
+ Assert.assertEquals("Should have the correct record name", "RecordWithTimestamps", actual.getName());
+ Assert.assertEquals("Should have the correct physical type", Schema.Type.LONG,
+ actual.getField("localDateTime").schema().getType());
+ Assert.assertEquals("Should have the correct logical type", LogicalTypes.localTimestampMillis(),
+ LogicalTypes.fromSchema(actual.getField("localDateTime").schema()));
+ }
+
private static <D> List<D> read(DatumReader<D> reader, File file) throws IOException {
List<D> data = new ArrayList<>();
@@ -713,3 +728,24 @@ class RecordWithUUIDList {
return this.uuids.equals(that.uuids);
}
}
+
+class RecordWithTimestamps {
+ LocalDateTime localDateTime;
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(localDateTime);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordWithTimestamps)) {
+ return false;
+ }
+ RecordWithTimestamps that = (RecordWithTimestamps) obj;
+ return Objects.equals(that.localDateTime, that.localDateTime);
+ }
+}
diff --git a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
index 97a9fad..a7053a4 100644
--- a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
+++ b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
@@ -242,6 +242,7 @@ TOKEN :
| < TIME: "time_ms" >
| < TIMESTAMP: "timestamp_ms" >
| < DECIMAL: "decimal" >
+| < LOCAL_TIMESTAMP: "local_timestamp_ms" >
}
/* LITERALS */
@@ -1497,6 +1498,7 @@ Schema PrimitiveType():
| "date" { return LogicalTypes.date().addToSchema(Schema.create(Type.INT)); }
| "time_ms" { return LogicalTypes.timeMillis().addToSchema(Schema.create(Type.INT)); }
| "timestamp_ms" { return LogicalTypes.timestampMillis().addToSchema(Schema.create(Type.LONG)); }
+| "local_timestamp_ms" { return LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Type.LONG)); }
| "decimal" { return DecimalTypeProperties(); }
}
@@ -1580,6 +1582,7 @@ Token AnyIdentifier():
t = <DATE> |
t = <TIME> |
t = <TIMESTAMP> |
+ t = <LOCAL_TIMESTAMP> |
t = <DECIMAL> |
t = <IDENTIFIER>)
{
diff --git a/lang/java/compiler/src/test/idl/input/mr_events.avdl b/lang/java/compiler/src/test/idl/input/mr_events.avdl
index 637884a..ffb90e9 100644
--- a/lang/java/compiler/src/test/idl/input/mr_events.avdl
+++ b/lang/java/compiler/src/test/idl/input/mr_events.avdl
@@ -66,6 +66,7 @@ protocol Events {
string jobName;
string userName;
timestamp_ms submitTime;
+ local_timestamp_ms submitTimeLocal;
string jobConfPath;
}
diff --git a/lang/java/compiler/src/test/idl/output/mr_events.avpr b/lang/java/compiler/src/test/idl/output/mr_events.avpr
index 58fd991..e9a32e1 100644
--- a/lang/java/compiler/src/test/idl/output/mr_events.avpr
+++ b/lang/java/compiler/src/test/idl/output/mr_events.avpr
@@ -113,6 +113,9 @@
"name" : "submitTime",
"type" : {"type": "long", "logicalType": "timestamp-millis"}
}, {
+ "name" : "submitTimeLocal",
+ "type" : {"type": "long", "logicalType": "local-timestamp-millis"}
+ }, {
"name" : "jobConfPath",
"type" : "string"
} ]