You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:25:39 UTC

[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r462561296



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+  private final Schema schema =

Review comment:
       Thanks. Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+  private final Schema schema =
+      Schema.builder().addInt64Field("Date").addInt64Field("Time").build();

Review comment:
       Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is a Long that represents incrementing count of days where day 0
+ * is 1970-01-01 (ISO). Time field is a Long that represents a count of time in nanoseconds.

Review comment:
       Make sense. Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
##########
@@ -31,4 +33,7 @@ private SqlTypes() {}
 
   /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
   public static final LogicalType<LocalTime, Long> TIME = new Time();
+
+  /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATETIME type. */

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
##########
@@ -258,23 +257,23 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
           // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime

Review comment:
       Test cases added and passed.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
##########
@@ -260,12 +261,24 @@
 
   private static final Schema TABLE_WTH_NUMERIC_SCHEMA =
       Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build();
+
   public static final TestBoundedTable TABLE_WITH_NUMERIC =
       TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA)
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3");
 
+  private static final Schema TABLE_WTH_DATETIME_SCHEMA =

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##########
@@ -184,6 +188,19 @@ private static Value beamLogicalObjectToZetaSqlValue(Object object, String ident
       } else { // input type
         return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
       }
+    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+      // DateTime value
+      LocalDateTime datetime;
+      if (object instanceof Row) { // base type
+        datetime =
+            LocalDateTime.of(
+                LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+                LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+      } else { // input type
+        datetime = (LocalDateTime) object;
+      }
+      return Value.createDatetimeValue(

Review comment:
       Yes. It would be far more convenient to have a method creating a ZetaSQL `DateTime` value directly based on Java `LocalDateTime` object. Currently we have to extract `bitFieldDatetimeSeconds` and `nanos` from `LocalDateTime` object first.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -850,6 +852,38 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) {
         // TODO: Doing micro to mills truncation, need to throw exception.
         ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
         break;
+      case TYPE_DATETIME:
+        // Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type
+        // because later it will be unparsed to the string representation of timestamp (e.g. "SELECT
+        // DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25
+        // 15:30:00:000000'"). So we create a wrapper function here such that we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        LocalDateTime dateTime =
+            CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+        TimestampString tsString =
+            new TimestampString(
+                    dateTime.getYear(),
+                    dateTime.getMonthValue(),
+                    dateTime.getDayOfMonth(),
+                    dateTime.getHour(),
+                    dateTime.getMinute(),
+                    dateTime.getSecond())
+                .withNanos(dateTime.getNano());
+
+        ret =
+            rexBuilder()
+                .makeCall(
+                    SqlOperators.createZetaSqlFunction(
+                        BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,
+                        ZetaSqlCalciteTranslationUtils.toCalciteTypeName(kind)),
+                    ImmutableList.of(

Review comment:
       Got it. Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -850,6 +852,38 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) {
         // TODO: Doing micro to mills truncation, need to throw exception.
         ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
         break;
+      case TYPE_DATETIME:
+        // Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type
+        // because later it will be unparsed to the string representation of timestamp (e.g. "SELECT
+        // DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25
+        // 15:30:00:000000'"). So we create a wrapper function here such that we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        LocalDateTime dateTime =

Review comment:
       Helper method added.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
##########
@@ -166,6 +168,12 @@ public void unparseCall(
         break;
       case OTHER_FUNCTION:
         String funName = call.getOperator().getName();
+        if (DATETIME_LITERAL_FUNCTION.equals(funName)) {
+          // self-designed function dealing with the unparsing of ZetaSQL DATETIME literal, to
+          // differentiate it from ZetaSQL TIMESTAMP literal
+          unparseDateTimeLiteralWrapperFunction(writer, call, leftPrec, rightPrec);
+          break;
+        }

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
##########
@@ -253,6 +261,12 @@ private void unparseTrim(SqlWriter writer, SqlCall call, int leftPrec, int right
     writer.endFunCall(trimFrame);
   }
 
+  private void unparseDateTimeLiteralWrapperFunction(
+      SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+    writer.literal("DATETIME");
+    writer.literal(call.operand(0).toString().substring(DATETIME_LITERAL_OFFSET));

Review comment:
       Make sense. Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
##########
@@ -218,6 +219,22 @@ public void testDateFromTimestamp() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test
+  public void testDateFromDateTime() {
+    String sql = "SELECT DATE(DATETIME '2008-12-25 15:30:00')";

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
##########
@@ -753,13 +786,415 @@ public void testParseTime() {
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
-  @Ignore("Does not support Datetime literal.")
-  public void testDatetimeLiteral() {
-    String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
+  public void testDateTimeLiteral() {
+    String sql = "SELECT DATETIME '2008-12-25 15:30:00'";
+
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
-    zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeColumn() {
+    String sql = "SELECT FORMAT_DATETIME('%b-%d-%Y', datetime_field) FROM table_with_datetime";

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
##########
@@ -258,23 +257,22 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
-

Review comment:
       Added back.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
##########
@@ -258,23 +257,22 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
-

Review comment:
       But it would fail when running `SpotlessCheck` and `SpotlessApply` will automatically delete that line.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
##########
@@ -753,13 +786,416 @@ public void testParseTime() {
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
-  @Ignore("Does not support Datetime literal.")
-  public void testDatetimeLiteral() {
-    String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
+  public void testDateTimeLiteral() {
+    String sql = "SELECT DATETIME '2008-12-25 15:30:00.123456'";
+
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
-    zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeColumn() {
+    String sql = "SELECT FORMAT_DATETIME('%D %T', datetime_field) FROM table_with_datetime";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+                .addValues("12/25/08 15:30:00")
+                .build(),
+            Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+                .addValues("10/06/12 11:45:00")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testGroupByDateTime() {
+    String sql = "SELECT datetime_field, COUNT(*) FROM table_with_datetime GROUP BY datetime_field";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder()
+            .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+            .addInt64Field("count")
+            .build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0), 1L).build(),
+            Row.withSchema(schema).addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0), 1L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testAggregateOnDateTime() {
+    String sql = "SELECT MAX(datetime_field) FROM table_with_datetime GROUP BY str_field";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+                        .build())
+                .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_DATETIME function ("SELECT CURRENT_DATETIME()")
+
+  @Test
+  public void testExtractFromDateTime() {
+    String sql =
+        "SELECT "
+            + "EXTRACT(YEAR FROM DATETIME '2008-12-25 15:30:00') as year, "
+            + "EXTRACT(QUARTER FROM DATETIME '2008-12-25 15:30:00') as quarter, "
+            + "EXTRACT(MONTH FROM DATETIME '2008-12-25 15:30:00') as month, "
+            // TODO[BEAM-9178]: Add tests for DATETIME_TRUNC and EXTRACT with "week with weekday"
+            //  date parts once they are supported
+            // + "EXTRACT(WEEK FROM DATETIME '2008-12-25 15:30:00') as week, "
+            + "EXTRACT(DAY FROM DATETIME '2008-12-25 15:30:00') as day, "
+            + "EXTRACT(DAYOFWEEK FROM DATETIME '2008-12-25 15:30:00') as dayofweek, "
+            + "EXTRACT(DAYOFYEAR FROM DATETIME '2008-12-25 15:30:00') as dayofyear, "
+            + "EXTRACT(HOUR FROM DATETIME '2008-12-25 15:30:00.123456') as hour, "
+            + "EXTRACT(MINUTE FROM DATETIME '2008-12-25 15:30:00.123456') as minute, "
+            + "EXTRACT(SECOND FROM DATETIME '2008-12-25 15:30:00.123456') as second, "
+            + "EXTRACT(MILLISECOND FROM DATETIME '2008-12-25 15:30:00.123456') as millisecond, "
+            + "EXTRACT(MICROSECOND FROM DATETIME '2008-12-25 15:30:00.123456') as microsecond, "
+            + "EXTRACT(DATE FROM DATETIME '2008-12-25 15:30:00.123456') as date, "
+            + "EXTRACT(TIME FROM DATETIME '2008-12-25 15:30:00.123456') as time ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder()
+            .addInt64Field("year")
+            .addInt64Field("quarter")
+            .addInt64Field("month")
+            // .addInt64Field("week")
+            .addInt64Field("day")
+            .addInt64Field("dayofweek")
+            .addInt64Field("dayofyear")
+            .addInt64Field("hour")
+            .addInt64Field("minute")
+            .addInt64Field("second")
+            .addInt64Field("millisecond")
+            .addInt64Field("microsecond")
+            .addLogicalTypeField("date", SqlTypes.DATE)
+            .addLogicalTypeField("time", SqlTypes.TIME)
+            .build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema)
+                .addValues(
+                    2008L,
+                    4L,
+                    12L,
+                    // 52L,
+                    25L,
+                    5L,
+                    360L,
+                    15L,
+                    30L,
+                    0L,
+                    123L,
+                    123456L,
+                    LocalDate.of(2008, 12, 25),
+                    LocalTime.of(15, 30, 0, 123456000))
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromDateAndTime() {
+    String sql = "SELECT DATETIME(DATE '2008-12-25', TIME '15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromDate() {
+    String sql = "SELECT DATETIME(DATE '2008-12-25')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 0, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromYearMonthDayHourMinuteSecond() {
+    String sql = "SELECT DATETIME(2008, 12, 25, 15, 30, 0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromTimestamp() {
+    String sql = "SELECT DATETIME(TIMESTAMP '2008-12-25 15:30:00+08', 'America/Los_Angeles')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 24, 23, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeAdd() {
+    String sql =
+        "SELECT "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+                        .build())
+                .addValues(
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000),
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000000),
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 10),
+                    LocalDateTime.of(2008, 12, 25, 15, 40, 0),
+                    LocalDateTime.of(2008, 12, 26, 1, 30, 0),
+                    LocalDateTime.of(2009, 1, 4, 15, 30, 0),
+                    LocalDateTime.of(2009, 10, 25, 15, 30, 0),
+                    LocalDateTime.of(2011, 6, 25, 15, 30, 0),
+                    LocalDateTime.of(2018, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeAddWithParameter() {
+    String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)";
+
+    LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000);
+    ImmutableMap<String, Value> params =
+        ImmutableMap.of(
+            "p0",
+                Value.createDatetimeValue(
+                    CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()),
+            "p1", Value.createInt64Value(3L));
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeSub() {
+    String sql =
+        "SELECT "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+                        .build())
+                .addValues(
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(999990000),
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(990000000),
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 50),
+                    LocalDateTime.of(2008, 12, 25, 15, 20, 0),
+                    LocalDateTime.of(2008, 12, 25, 5, 30, 0),
+                    LocalDateTime.of(2008, 12, 15, 15, 30, 0),
+                    LocalDateTime.of(2008, 2, 25, 15, 30, 0),
+                    LocalDateTime.of(2006, 6, 25, 15, 30, 0),
+                    LocalDateTime.of(1998, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeDiff() {
+    String sql =
+        "SELECT DATETIME_DIFF(DATETIME '2008-12-25 15:30:00', DATETIME '2008-10-25 15:30:00', DAY)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+                .addValues(61L)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeDiffNegativeResult() {
+    String sql =
+        "SELECT DATETIME_DIFF(DATETIME '2008-10-25 15:30:00', DATETIME '2008-12-25 15:30:00', DAY)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+                .addValues(-61L)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeTrunc() {
+    String sql = "SELECT DATETIME_TRUNC(DATETIME '2008-12-25 15:30:00', HOUR)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_datetime_trunc", SqlTypes.DATETIME)
+                        .build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFormatDateTime() {
+    String sql = "SELECT FORMAT_DATETIME('%D %T', DATETIME '2008-12-25 15:30:00')";

Review comment:
       Added.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
##########
@@ -225,47 +226,59 @@
   public static final TestBoundedTable TABLE_EMPTY =
       TestBoundedTable.of(Schema.builder().addInt64Field("ColId").addStringField("Value").build());
 
-  private static final Schema TABLE_WTH_MAP_SCHEMA =
+  private static final Schema TABLE_WITH_MAP_SCHEMA =
       Schema.builder()
           .addMapField("map_field", FieldType.STRING, FieldType.STRING)
           .addRowField("row_field", structSchema)
           .build();
   public static final TestBoundedTable TABLE_WITH_MAP =
-      TestBoundedTable.of(TABLE_WTH_MAP_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_MAP_SCHEMA)
           .addRows(
               ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"),
               Row.withSchema(structSchema).addValues(1L, "data1").build());
 
-  private static final Schema TABLE_WTH_DATE_SCHEMA =
+  private static final Schema TABLE_WITH_DATE_SCHEMA =
       Schema.builder()
           .addLogicalTypeField("date_field", SqlTypes.DATE)
           .addStringField("str_field")
           .build();
 
   public static final TestBoundedTable TABLE_WITH_DATE =
-      TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_DATE_SCHEMA)
           .addRows(LocalDate.of(2008, 12, 25), "s")
           .addRows(LocalDate.of(2020, 4, 7), "s");
 
-  private static final Schema TABLE_WTH_TIME_SCHEMA =
+  private static final Schema TABLE_WITH_TIME_SCHEMA =
       Schema.builder()
           .addLogicalTypeField("time_field", SqlTypes.TIME)
           .addStringField("str_field")
           .build();
 
   public static final TestBoundedTable TABLE_WITH_TIME =
-      TestBoundedTable.of(TABLE_WTH_TIME_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_TIME_SCHEMA)
           .addRows(LocalTime.of(15, 30, 0), "s")
           .addRows(LocalTime.of(23, 35, 59), "s");
 
-  private static final Schema TABLE_WTH_NUMERIC_SCHEMA =
+  private static final Schema TABLE_WITH_NUMERIC_SCHEMA =
       Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build();
+
   public static final TestBoundedTable TABLE_WITH_NUMERIC =
-      TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_NUMERIC_SCHEMA)
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3");
 
+  private static final Schema TABLE_WITH_DATETIME_SCHEMA =
+      Schema.builder()
+          .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+          .addStringField("str_field")
+          .build();
+
+  public static final TestBoundedTable TABLE_WITH_DATETIME =
+      TestBoundedTable.of(TABLE_WITH_DATETIME_SCHEMA)
+          .addRows(LocalDateTime.of(2008, 12, 25, 15, 30, 0), "s")

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
##########
@@ -258,23 +257,22 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
-

Review comment:
       Good idea. Comments Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org