You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by nk...@apache.org on 2019/07/08 14:44:05 UTC

[avro] branch master updated: AVRO-2328: Support distinguishing between LocalDateTime and Instant semantics in timestamps (#525)

This is an automated email from the ASF dual-hosted git repository.

nkollar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new a2098bd  AVRO-2328: Support distinguishing between LocalDateTime and Instant semantics in timestamps (#525)
a2098bd is described below

commit a2098bd88361b5ff9298c462a2dccafbb0c2c508
Author: Nándor Kollár <na...@users.noreply.github.com>
AuthorDate: Mon Jul 8 16:43:59 2019 +0200

    AVRO-2328: Support distinguishing between LocalDateTime and Instant semantics in timestamps (#525)
    
    New logical type for local timestamp semantic.
---
 doc/src/content/xdocs/spec.xml                     | 24 ++++++
 .../main/java/org/apache/avro/LogicalTypes.java    | 48 ++++++++++++
 .../java/org/apache/avro/data/TimeConversions.java | 64 ++++++++++++++++
 .../avro/generic/TestGenericLogicalTypes.java      | 89 +++++++++++++++++++++-
 .../avro/reflect/TestReflectLogicalTypes.java      | 36 +++++++++
 .../javacc/org/apache/avro/compiler/idl/idl.jj     |  3 +
 .../compiler/src/test/idl/input/mr_events.avdl     |  1 +
 .../compiler/src/test/idl/output/mr_events.avpr    |  3 +
 8 files changed, 267 insertions(+), 1 deletion(-)

diff --git a/doc/src/content/xdocs/spec.xml b/doc/src/content/xdocs/spec.xml
index 7c01cd0..466cd73 100644
--- a/doc/src/content/xdocs/spec.xml
+++ b/doc/src/content/xdocs/spec.xml
@@ -1514,6 +1514,8 @@ void initFPTable() {
         <title>Timestamp (millisecond precision)</title>
         <p>
           The <code>timestamp-millis</code> logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond.
+          Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation.
+          In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment.
         </p>
         <p>
           A <code>timestamp-millis</code> logical type annotates an Avro <code>long</code>, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC.
@@ -1524,6 +1526,8 @@ void initFPTable() {
         <title>Timestamp (microsecond precision)</title>
         <p>
           The <code>timestamp-micros</code> logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one microsecond.
+          Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation.
+          In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment.
         </p>
         <p>
           A <code>timestamp-micros</code> logical type annotates an Avro <code>long</code>, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
@@ -1531,6 +1535,26 @@ void initFPTable() {
       </section>
 
       <section>
+        <title>Local timestamp (millisecond precision)</title>
+        <p>
+          The <code>local-timestamp-millis</code> logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one millisecond.
+        </p>
+        <p>
+          A <code>local-timestamp-millis</code> logical type annotates an Avro <code>long</code>, where the long stores the number of milliseconds, from 1 January 1970 00:00:00.000.
+        </p>
+      </section>
+
+      <section>
+        <title>Local timestamp (microsecond precision)</title>
+        <p>
+          The <code>local-timestamp-micros</code> logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one microsecond.
+        </p>
+        <p>
+          A <code>local-timestamp-micros</code> logical type annotates an Avro <code>long</code>, where the long stores the number of microseconds, from 1 January 1970 00:00:00.000000.
+        </p>
+      </section>
+
+      <section>
         <title>Duration</title>
         <p>
           The <code>duration</code> logical type represents an amount of time defined by a number of months, days and milliseconds. This is not equivalent to a number of milliseconds, because, depending on the moment in time from which the duration is measured, the number of days in the month and number of milliseconds in a day may differ. Other standard periods such as years, quarters, hours and minutes can be expressed through these basic periods.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
index 84ba531..a297bc2 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
@@ -86,6 +86,12 @@ public class LogicalTypes {
       case TIME_MICROS:
         logicalType = TIME_MICROS_TYPE;
         break;
+      case LOCAL_TIMESTAMP_MICROS:
+        logicalType = LOCAL_TIMESTAMP_MICROS_TYPE;
+        break;
+      case LOCAL_TIMESTAMP_MILLIS:
+        logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE;
+        break;
       default:
         final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
         if (typeFactory != null) {
@@ -120,6 +126,8 @@ public class LogicalTypes {
   private static final String TIME_MICROS = "time-micros";
   private static final String TIMESTAMP_MILLIS = "timestamp-millis";
   private static final String TIMESTAMP_MICROS = "timestamp-micros";
+  private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
+  private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
 
   /** Create a Decimal LogicalType with the given precision and scale 0 */
   public static Decimal decimal(int precision) {
@@ -167,6 +175,18 @@ public class LogicalTypes {
     return TIMESTAMP_MICROS_TYPE;
   }
 
+  private static final LocalTimestampMillis LOCAL_TIMESTAMP_MILLIS_TYPE = new LocalTimestampMillis();
+
+  public static LocalTimestampMillis localTimestampMillis() {
+    return LOCAL_TIMESTAMP_MILLIS_TYPE;
+  }
+
+  private static final LocalTimestampMicros LOCAL_TIMESTAMP_MICROS_TYPE = new LocalTimestampMicros();
+
+  public static LocalTimestampMicros localTimestampMicros() {
+    return LOCAL_TIMESTAMP_MICROS_TYPE;
+  }
+
   /** Decimal represents arbitrary-precision fixed-scale decimal numbers */
   public static class Decimal extends LogicalType {
     private static final String PRECISION_PROP = "precision";
@@ -359,4 +379,32 @@ public class LogicalTypes {
     }
   }
 
+  public static class LocalTimestampMillis extends LogicalType {
+    private LocalTimestampMillis() {
+      super(LOCAL_TIMESTAMP_MILLIS);
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Local timestamp (millis) can only be used with an underlying long type");
+      }
+    }
+  }
+
+  public static class LocalTimestampMicros extends LogicalType {
+    private LocalTimestampMicros() {
+      super(LOCAL_TIMESTAMP_MICROS);
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Local timestamp (micros) can only be used with an underlying long type");
+      }
+    }
+  }
+
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java b/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
index 1351ccb..52ec426 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
@@ -25,7 +25,9 @@ import org.apache.avro.Schema;
 
 import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneOffset;
 import java.util.concurrent.TimeUnit;
 
 public class TimeConversions {
@@ -201,4 +203,66 @@ public class TimeConversions {
       return LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
     }
   }
+
+  public static class LocalTimestampMillisConversion extends Conversion<LocalDateTime> {
+    private final TimestampMillisConversion timestampMillisConversion = new TimestampMillisConversion();
+
+    @Override
+    public Class<LocalDateTime> getConvertedType() {
+      return LocalDateTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "local-timestamp-millis";
+    }
+
+    @Override
+    public LocalDateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
+      Instant instant = timestampMillisConversion.fromLong(millisFromEpoch, schema, type);
+      return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+
+    @Override
+    public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
+      Instant instant = timestamp.toInstant(ZoneOffset.UTC);
+      return timestampMillisConversion.toLong(instant, schema, type);
+    }
+
+    @Override
+    public Schema getRecommendedSchema() {
+      return LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+    }
+  }
+
+  public static class LocalTimestampMicrosConversion extends Conversion<LocalDateTime> {
+    private final TimestampMicrosConversion timestampMicrosConversion = new TimestampMicrosConversion();
+
+    @Override
+    public Class<LocalDateTime> getConvertedType() {
+      return LocalDateTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "local-timestamp-micros";
+    }
+
+    @Override
+    public LocalDateTime fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
+      Instant instant = timestampMicrosConversion.fromLong(microsFromEpoch, schema, type);
+      return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+
+    @Override
+    public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
+      Instant instant = timestamp.toInstant(ZoneOffset.UTC);
+      return timestampMicrosConversion.toLong(instant, schema, type);
+    }
+
+    @Override
+    public Schema getRecommendedSchema() {
+      return LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+    }
+  }
 }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
index 0603bfe..61f0b05 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
@@ -22,9 +22,13 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.*;
 
 import org.apache.avro.*;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
@@ -44,9 +48,11 @@ public class TestGenericLogicalTypes {
   public static final GenericData GENERIC = new GenericData();
 
   @BeforeClass
-  public static void addDecimalAndUUID() {
+  public static void addLogicalTypes() {
     GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
     GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+    GENERIC.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
+    GENERIC.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
   }
 
   @Test
@@ -268,4 +274,85 @@ public class TestGenericLogicalTypes {
     Assert.assertEquals(original, copy);
   }
 
+  @Test
+  public void testReadLocalTimestampMillis() throws IOException {
+    LogicalType timestamp = LogicalTypes.localTimestampMillis();
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+    LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 42000000);
+    LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);
+    List<LocalDateTime> expected = Arrays.asList(i1, i2);
+
+    Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMillisConversion();
+
+    // use the conversion directly instead of relying on the write side
+    Long i1long = conversion.toLong(i1, longSchema, timestamp);
+    Long i2long = 0L;
+
+    File test = write(longSchema, i1long, i2long);
+    Assert.assertEquals("Should convert long to LocalDateTime", expected,
+        read(GENERIC.createDatumReader(timestampSchema), test));
+  }
+
+  @Test
+  public void testWriteLocalTimestampMillis() throws IOException {
+    LogicalType timestamp = LogicalTypes.localTimestampMillis();
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+    LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 42000000);
+    LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);
+
+    Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMillisConversion();
+
+    Long d1long = conversion.toLong(i1, longSchema, timestamp);
+    Long d2long = 0L;
+    List<Long> expected = Arrays.asList(d1long, d2long);
+
+    File test = write(GENERIC, timestampSchema, i1, i2);
+    Assert.assertEquals("Should read LocalDateTime as longs", expected,
+        read(GenericData.get().createDatumReader(timestampSchema), test));
+  }
+
+  @Test
+  public void testReadLocalTimestampMicros() throws IOException {
+    LogicalType timestamp = LogicalTypes.localTimestampMicros();
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+    LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 420000);
+    LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, 4000), ZoneOffset.UTC);
+    List<LocalDateTime> expected = Arrays.asList(i1, i2);
+
+    Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMicrosConversion();
+
+    // use the conversion directly instead of relying on the write side
+    Long i1long = conversion.toLong(i1, longSchema, timestamp);
+    Long i2long = conversion.toLong(i2, longSchema, timestamp);
+
+    File test = write(longSchema, i1long, i2long);
+    Assert.assertEquals("Should convert long to LocalDateTime", expected,
+        read(GENERIC.createDatumReader(timestampSchema), test));
+  }
+
+  @Test
+  public void testWriteLocalTimestampMicros() throws IOException {
+    LogicalType timestamp = LogicalTypes.localTimestampMicros();
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema timestampSchema = timestamp.addToSchema(Schema.create(Schema.Type.LONG));
+
+    LocalDateTime i1 = LocalDateTime.of(1986, 06, 26, 12, 07, 11, 420000);
+    LocalDateTime i2 = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, 4000), ZoneOffset.UTC);
+
+    Conversion<LocalDateTime> conversion = new TimeConversions.LocalTimestampMicrosConversion();
+
+    Long d1long = conversion.toLong(i1, longSchema, timestamp);
+    Long d2long = conversion.toLong(i2, longSchema, timestamp);
+    List<Long> expected = Arrays.asList(d1long, d2long);
+
+    File test = write(GENERIC, timestampSchema, i1, i2);
+    Assert.assertEquals("Should read LocalDateTime as longs", expected,
+        read(GenericData.get().createDatumReader(timestampSchema), test));
+  }
 }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
index a2b242d..066a801 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
@@ -21,9 +21,11 @@ package org.apache.avro.reflect;
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.time.LocalDateTime;
 import java.util.*;
 
 import org.apache.avro.*;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
@@ -50,6 +52,7 @@ public class TestReflectLogicalTypes {
   public static void addUUID() {
     REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion());
     REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    REFLECT.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
   }
 
   @Test
@@ -598,6 +601,18 @@ public class TestReflectLogicalTypes {
         read(REFLECT.createDatumReader(stringArraySchema), test).get(0));
   }
 
+  @Test
+  public void testReflectedSchemaLocalDateTime() {
+    Schema actual = REFLECT.getSchema(RecordWithTimestamps.class);
+
+    Assert.assertEquals("Should have the correct record name", "org.apache.avro.reflect", actual.getNamespace());
+    Assert.assertEquals("Should have the correct record name", "RecordWithTimestamps", actual.getName());
+    Assert.assertEquals("Should have the correct physical type", Schema.Type.LONG,
+        actual.getField("localDateTime").schema().getType());
+    Assert.assertEquals("Should have the correct logical type", LogicalTypes.localTimestampMillis(),
+        LogicalTypes.fromSchema(actual.getField("localDateTime").schema()));
+  }
+
   private static <D> List<D> read(DatumReader<D> reader, File file) throws IOException {
     List<D> data = new ArrayList<>();
 
@@ -713,3 +728,24 @@ class RecordWithUUIDList {
     return this.uuids.equals(that.uuids);
   }
 }
+
+class RecordWithTimestamps {
+  LocalDateTime localDateTime;
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(localDateTime);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithTimestamps)) {
+      return false;
+    }
+    RecordWithTimestamps that = (RecordWithTimestamps) obj;
+    return Objects.equals(that.localDateTime, that.localDateTime);
+  }
+}
diff --git a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
index 97a9fad..a7053a4 100644
--- a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
+++ b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
@@ -242,6 +242,7 @@ TOKEN :
 | < TIME: "time_ms" >
 | < TIMESTAMP: "timestamp_ms" >
 | < DECIMAL: "decimal" >
+| < LOCAL_TIMESTAMP: "local_timestamp_ms" >
 }
 
 /* LITERALS */
@@ -1497,6 +1498,7 @@ Schema PrimitiveType():
 | "date" { return LogicalTypes.date().addToSchema(Schema.create(Type.INT)); }
 | "time_ms" { return LogicalTypes.timeMillis().addToSchema(Schema.create(Type.INT)); }
 | "timestamp_ms" { return LogicalTypes.timestampMillis().addToSchema(Schema.create(Type.LONG)); }
+| "local_timestamp_ms" { return LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Type.LONG)); }
 | "decimal" { return DecimalTypeProperties(); }
 }
 
@@ -1580,6 +1582,7 @@ Token AnyIdentifier():
    t = <DATE> |
    t = <TIME> |
    t = <TIMESTAMP> |
+   t = <LOCAL_TIMESTAMP> |
    t = <DECIMAL> |
    t = <IDENTIFIER>)
   {
diff --git a/lang/java/compiler/src/test/idl/input/mr_events.avdl b/lang/java/compiler/src/test/idl/input/mr_events.avdl
index 637884a..ffb90e9 100644
--- a/lang/java/compiler/src/test/idl/input/mr_events.avdl
+++ b/lang/java/compiler/src/test/idl/input/mr_events.avdl
@@ -66,6 +66,7 @@ protocol Events {
     string jobName;
     string userName;
     timestamp_ms submitTime;
+    local_timestamp_ms submitTimeLocal;
     string jobConfPath;
   }
 
diff --git a/lang/java/compiler/src/test/idl/output/mr_events.avpr b/lang/java/compiler/src/test/idl/output/mr_events.avpr
index 58fd991..e9a32e1 100644
--- a/lang/java/compiler/src/test/idl/output/mr_events.avpr
+++ b/lang/java/compiler/src/test/idl/output/mr_events.avpr
@@ -113,6 +113,9 @@
       "name" : "submitTime",
       "type" : {"type": "long", "logicalType": "timestamp-millis"}
     }, {
+      "name" : "submitTimeLocal",
+      "type" : {"type": "long", "logicalType": "local-timestamp-millis"}
+    }, {
       "name" : "jobConfPath",
       "type" : "string"
     } ]