You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:39:24 UTC

[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

robinyqiu commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r461882367



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+  private final Schema schema =

Review comment:
       This should be a constant (make static and use upper-case name like DATETIME_SCHEMA)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.

Review comment:
       I would mention these 2 longs are the same as the base types of `Date` and `Time`.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+  private final Schema schema =
+      Schema.builder().addInt64Field("Date").addInt64Field("Time").build();

Review comment:
       `"Date"` and `"Time"` are also used below. We should make them constants as well.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##########
@@ -184,6 +188,19 @@ private static Value beamLogicalObjectToZetaSqlValue(Object object, String ident
       } else { // input type
         return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
       }
+    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+      // DateTime value
+      LocalDateTime datetime;
+      if (object instanceof Row) { // base type
+        datetime =
+            LocalDateTime.of(
+                LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+                LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+      } else { // input type
+        datetime = (LocalDateTime) object;
+      }
+      return Value.createDatetimeValue(

Review comment:
       Internally we have a `Value.createDatetimeValue()` method that takes a Java `LocalDateTime`. I think that is what we want here. But we don't have it now because it has not been open-sourced to ZetaSQL.
   
   (This is not a comment, but a note to ourselves.)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -850,6 +852,38 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) {
         // TODO: Doing micro to mills truncation, need to throw exception.
         ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
         break;
+      case TYPE_DATETIME:
+        // Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type
+        // because later it will be unparsed to the string representation of timestamp (e.g. "SELECT
+        // DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25
+        // 15:30:00:000000'"). So we create a wrapper function here such that we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        LocalDateTime dateTime =

Review comment:
       Could you make a `convertDateTimeValueToTimeString` helper method in `DateTimeUtils` like we did for Date and Time?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
##########
@@ -166,6 +168,12 @@ public void unparseCall(
         break;
       case OTHER_FUNCTION:
         String funName = call.getOperator().getName();
+        if (DATETIME_LITERAL_FUNCTION.equals(funName)) {
+          // self-designed function dealing with the unparsing of ZetaSQL DATETIME literal, to
+          // differentiate it from ZetaSQL TIMESTAMP literal
+          unparseDateTimeLiteralWrapperFunction(writer, call, leftPrec, rightPrec);
+          break;
+        }

Review comment:
       else if to be consistent?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
##########
@@ -258,23 +257,23 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
           // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime

Review comment:
       Could you implement the support for this and add a test as well?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -850,6 +852,38 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) {
         // TODO: Doing micro to mills truncation, need to throw exception.
         ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
         break;
+      case TYPE_DATETIME:
+        // Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type
+        // because later it will be unparsed to the string representation of timestamp (e.g. "SELECT
+        // DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25
+        // 15:30:00:000000'"). So we create a wrapper function here such that we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        LocalDateTime dateTime =
+            CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+        TimestampString tsString =
+            new TimestampString(
+                    dateTime.getYear(),
+                    dateTime.getMonthValue(),
+                    dateTime.getDayOfMonth(),
+                    dateTime.getHour(),
+                    dateTime.getMinute(),
+                    dateTime.getSecond())
+                .withNanos(dateTime.getNano());
+
+        ret =
+            rexBuilder()
+                .makeCall(
+                    SqlOperators.createZetaSqlFunction(
+                        BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,
+                        ZetaSqlCalciteTranslationUtils.toCalciteTypeName(kind)),
+                    ImmutableList.of(

Review comment:
       You don't need to create a list here. I think there is another overload of this function that takes a single operand.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
##########
@@ -260,12 +261,24 @@
 
   private static final Schema TABLE_WTH_NUMERIC_SCHEMA =
       Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build();
+
   public static final TestBoundedTable TABLE_WITH_NUMERIC =
       TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA)
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3");
 
+  private static final Schema TABLE_WTH_DATETIME_SCHEMA =

Review comment:
       Could you please rename all `WTH` with `WITH` by the way? Thanks!

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
##########
@@ -753,13 +786,415 @@ public void testParseTime() {
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
-  @Ignore("Does not support Datetime literal.")
-  public void testDatetimeLiteral() {
-    String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
+  public void testDateTimeLiteral() {
+    String sql = "SELECT DATETIME '2008-12-25 15:30:00'";
+
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
-    zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeColumn() {
+    String sql = "SELECT FORMAT_DATETIME('%b-%d-%Y', datetime_field) FROM table_with_datetime";

Review comment:
       Choose time component here to format as well?

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
##########
@@ -218,6 +219,22 @@ public void testDateFromTimestamp() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test
+  public void testDateFromDateTime() {
+    String sql = "SELECT DATE(DATETIME '2008-12-25 15:30:00')";

Review comment:
       Use `DATETIME` with micro-second component to be more generic (same below)?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
##########
@@ -31,4 +33,7 @@ private SqlTypes() {}
 
   /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
   public static final LogicalType<LocalTime, Long> TIME = new Time();
+
+  /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATETIME type. */

Review comment:
       CalciteSQL does not have a DATETIME type. I think we don't need to mention CalciteSQL here.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
##########
@@ -253,6 +261,12 @@ private void unparseTrim(SqlWriter writer, SqlCall call, int leftPrec, int right
     writer.endFunCall(trimFrame);
   }
 
+  private void unparseDateTimeLiteralWrapperFunction(
+      SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+    writer.literal("DATETIME");
+    writer.literal(call.operand(0).toString().substring(DATETIME_LITERAL_OFFSET));

Review comment:
       I would recommend using `replace` (replace `TIMESTAMP` with `DATETIME`) here instead of `substring`. I think that's more readable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org