You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/12 00:25:50 UTC

[beam] branch master updated: Use new ZetaSQL value create API (#12536)

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d7ecb2  Use new ZetaSQL value create API (#12536)
6d7ecb2 is described below

commit 6d7ecb260baff8fff4a50c323d30532c7dc0b4b3
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Tue Aug 11 17:25:36 2020 -0700

    Use new ZetaSQL value create API (#12536)
---
 .../sdk/extensions/sql/zetasql/DateTimeUtils.java   |  3 +--
 .../sql/zetasql/ZetaSqlBeamTranslationUtils.java    | 21 +++++++++------------
 .../sql/zetasql/ZetaSqlCalciteTranslationUtils.java |  8 +++-----
 .../zetasql/ZetaSqlBeamTranslationUtilsTest.java    | 14 +++++---------
 .../sql/zetasql/ZetaSqlTimeFunctionsTest.java       |  9 +++------
 5 files changed, 21 insertions(+), 34 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
index eb0da97..c639c09 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.Value;
 import io.grpc.Status;
 import java.time.LocalTime;
@@ -166,7 +165,7 @@ public class DateTimeUtils {
 
   public static Value parseTimeToValue(String timeString) {
     LocalTime localTime = LocalTime.parse(timeString);
-    return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos(localTime));
+    return Value.createTimeValue(localTime);
   }
 
   public static Value parseTimestampWithTZToValue(String timestampString) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 5a0762d..d38d597 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
 
 import com.google.protobuf.ByteString;
 import com.google.zetasql.ArrayType;
-import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
@@ -132,16 +131,16 @@ public final class ZetaSqlBeamTranslationUtils {
           if (object instanceof Long) { // base type
             return Value.createDateValue(((Long) object).intValue());
           } else { // input type
-            return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
+            return Value.createDateValue((LocalDate) object);
           }
         } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+          LocalTime localTime;
           if (object instanceof Long) { // base type
-            return Value.createTimeValue(
-                CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
+            localTime = LocalTime.ofNanoOfDay((Long) object);
           } else { // input type
-            return Value.createTimeValue(
-                CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
+            localTime = (LocalTime) object;
           }
+          return Value.createTimeValue(localTime);
         } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
           LocalDateTime datetime;
           if (object instanceof Row) { // base type
@@ -152,9 +151,7 @@ public final class ZetaSqlBeamTranslationUtils {
           } else { // input type
             datetime = (LocalDateTime) object;
           }
-          // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
-          return Value.createDatetimeValue(
-              CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
+          return Value.createDatetimeValue(datetime);
         } else {
           throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
         }
@@ -254,11 +251,11 @@ public final class ZetaSqlBeamTranslationUtils {
       case LOGICAL_TYPE:
         String identifier = fieldType.getLogicalType().getIdentifier();
         if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-          return LocalDate.ofEpochDay(value.getDateValue());
+          return value.getLocalDateValue();
         } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
-          return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+          return value.getLocalTimeValue();
         } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
-          return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+          return value.getLocalDateTimeValue();
         } else {
           throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
         }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 8586058..620342e 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
@@ -184,7 +183,7 @@ public final class ZetaSqlCalciteTranslationUtils {
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i; // empty field name is not allowed, generate an index-based name for it
+        name = "$col" + i; // avoid empty field names because Beam does not allow duplicate names
       }
       b.add(name);
     }
@@ -318,14 +317,13 @@ public final class ZetaSqlCalciteTranslationUtils {
   }
 
   private static TimeString timeValueToTimeString(Value value) {
-    LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+    LocalTime localTime = value.getLocalTimeValue();
     return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
         .withNanos(localTime.getNano());
   }
 
   private static TimestampString datetimeValueToTimestampString(Value value) {
-    LocalDateTime dateTime =
-        CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+    LocalDateTime dateTime = value.getLocalDateTimeValue();
     return new TimestampString(
             dateTime.getYear(),
             dateTime.getMonthValue(),
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index e6e54cf..27b3c8e 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.protobuf.ByteString;
 import com.google.zetasql.ArrayType;
-import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.TypeFactory;
@@ -100,8 +99,8 @@ public class ZetaSqlBeamTranslationUtilsTest {
           .addValue("Hello")
           .addValue(new byte[] {0x11, 0x22})
           .addValue(LocalDate.of(2020, 6, 4))
-          .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
-          .addValue(LocalTime.of(15, 30, 45))
+          .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+          .addValue(LocalTime.of(15, 30, 45).withNano(123456000))
           .addValue(Instant.ofEpochMilli(12345678L))
           .addArray(3.0, 6.5)
           .addValue(Row.withSchema(TEST_INNER_SCHEMA).addValues(0L, "world").build())
@@ -118,13 +117,10 @@ public class ZetaSqlBeamTranslationUtilsTest {
               Value.createBoolValue(false),
               Value.createStringValue("Hello"),
               Value.createBytesValue(ByteString.copyFrom(new byte[] {0x11, 0x22})),
-              Value.createDateValue((int) LocalDate.of(2020, 6, 4).toEpochDay()),
+              Value.createDateValue(LocalDate.of(2020, 6, 4)),
               Value.createDatetimeValue(
-                  CivilTimeEncoder.encodePacked64DatetimeSeconds(
-                      LocalDateTime.of(2008, 12, 25, 15, 30, 0)),
-                  LocalDateTime.of(2008, 12, 25, 15, 30, 0).getNano()),
-              Value.createTimeValue(
-                  CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.of(15, 30, 45))),
+                  LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000)),
+              Value.createTimeValue(LocalTime.of(15, 30, 45).withNano(123456000)),
               Value.createTimestampValueFromUnixMicros(12345678000L),
               Value.createArrayValue(
                   TEST_INNER_ARRAY_TYPE,
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
index 109ca1e..ca37715 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTime
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone;
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone;
 
-import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
 import java.time.LocalDate;
@@ -1049,12 +1048,10 @@ public class ZetaSqlTimeFunctionsTest extends ZetaSqlTestBase {
   public void testDateTimeAddWithParameter() {
     String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)";
 
-    LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000);
+    LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000);
     ImmutableMap<String, Value> params =
         ImmutableMap.of(
-            "p0",
-                Value.createDatetimeValue(
-                    CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()),
+            "p0", Value.createDatetimeValue(datetime),
             "p1", Value.createInt64Value(3L));
 
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
@@ -1065,7 +1062,7 @@ public class ZetaSqlTimeFunctionsTest extends ZetaSqlTestBase {
         .containsInAnyOrder(
             Row.withSchema(
                     Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
-                .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000))
+                .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 0).withNano(123456000))
                 .build());
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }