You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/24 22:28:29 UTC

[GitHub] [beam] robinyqiu commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

robinyqiu commented on a change in pull request #12054:
URL: https://github.com/apache/beam/pull/12054#discussion_r444548460



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Time.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class Time implements Schema.LogicalType<LocalTime, Long> {

Review comment:
       Please add class-level javadoc for this (see `DATE` for example.)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -315,7 +317,7 @@ private static Expression castOutput(Expression value, FieldType toType) {
   private static Expression castOutputTime(Expression value, FieldType toType) {
     Expression valueDateTime = value;
 
-    // First, convert to millis (except for DATE type)
+    // First, convert to millis (except for DATE/TIME type)

Review comment:
       Seems we can combine the first and second step now (and update the comments). The code will look much simpler that way.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+  // TIME type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testTimeLiteral() {
+    String sql = "SELECT TIME '15:30:00'";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+                .addValues(LocalTime.of(15, 30, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTimeColumn() {
+    String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("15:30:00")
+                .build(),
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("23:35:59")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_TIME function ("SELECT CURRENT_TIME()")
+
+  @Test
+  public void testExtractTime() {
+    String sql =
+        "SELECT "
+            + "EXTRACT(HOUR FROM TIME '15:30:35') as hour, "
+            + "EXTRACT(MINUTE FROM TIME '15:30:35') as minute, "
+            + "EXTRACT(SECOND FROM TIME '15:30:35') as second, "
+            + "EXTRACT(MILLISECOND FROM TIME '15:30:35') as millisecond, "
+            + "EXTRACT(MICROSECOND FROM TIME '15:30:35') as microsecond ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder()
+            .addField("hour", FieldType.INT64)
+            .addField("minute", FieldType.INT64)
+            .addField("second", FieldType.INT64)
+            .addField("millisecond", FieldType.INT64)
+            .addField("microsecond", FieldType.INT64)
+            .build();
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(schema).addValues(15L, 30L, 35L, 0L, 0L).build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTimeFromHourMinuteSecond() {
+    String sql = "SELECT TIME(15, 30, 0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+                .addValues(LocalTime.of(15, 30, 0, 0))

Review comment:
       Nit: you can omit the last `0` parameter (same below)

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+  // TIME type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testTimeLiteral() {
+    String sql = "SELECT TIME '15:30:00'";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+                .addValues(LocalTime.of(15, 30, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTimeColumn() {
+    String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("15:30:00")
+                .build(),
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("23:35:59")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_TIME function ("SELECT CURRENT_TIME()")
+
+  @Test
+  public void testExtractTime() {
+    String sql =
+        "SELECT "
+            + "EXTRACT(HOUR FROM TIME '15:30:35') as hour, "
+            + "EXTRACT(MINUTE FROM TIME '15:30:35') as minute, "
+            + "EXTRACT(SECOND FROM TIME '15:30:35') as second, "
+            + "EXTRACT(MILLISECOND FROM TIME '15:30:35') as millisecond, "
+            + "EXTRACT(MICROSECOND FROM TIME '15:30:35') as microsecond ";

Review comment:
       Also, does `EXTRACT(NANOSECOND ...)` work?

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##########
@@ -375,38 +376,6 @@ public void testNullInnerRow() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
 
-  private static class DummySqlTimeType implements Schema.LogicalType<Long, Instant> {
-    @Override
-    public String getIdentifier() {
-      return "SqlTimeType";
-    }
-
-    @Override
-    public FieldType getArgumentType() {
-      return FieldType.STRING;
-    }
-
-    @Override
-    public String getArgument() {
-      return "";
-    }
-
-    @Override
-    public Schema.FieldType getBaseType() {
-      return Schema.FieldType.DATETIME;
-    }
-
-    @Override
-    public Instant toBaseType(Long input) {
-      return (input == null ? null : new Instant((long) input));
-    }
-
-    @Override
-    public Long toInputType(Instant base) {
-      return (base == null ? null : base.getMillis());
-    }
-  }
-
   @Test
   public void testNullDatetimeFields() {

Review comment:
       Please clean up the test a bit more. Unused local variables can be removed. I would also like to add an additional field for like, `dateTypeFileld + INTERVAL 1 day`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -431,11 +443,9 @@ private static Expression value(
     private static Expression value(Expression value, Schema.FieldType type) {
       if (type.getTypeName().isLogicalType()) {
         String logicalId = type.getLogicalType().getIdentifier();
-        if (TimeType.IDENTIFIER.equals(logicalId)) {
-          return nullOr(
-              value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class));
-        } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
-          value = nullOr(value, value);
+        if (SqlTypes.TIME.getIdentifier().equals(logicalId)
+            || SqlTypes.DATE.getIdentifier().equals(logicalId)) {
+          return value;

Review comment:
       For TIME it should be `nullOr(value, Expressions.divide(value, 1000000L))`;

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+  // TIME type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testTimeLiteral() {
+    String sql = "SELECT TIME '15:30:00'";

Review comment:
       Can we test on a time having sub-second components? I wonder if `TIME` literal with microsecond precision works, because it needs to go through the "ZetaSQL -> Calcite -> Beam" code path.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
##########
@@ -65,7 +66,7 @@ public void encodeAndDecode() throws Exception {
                 1.1,
                 BigDecimal.ZERO,
                 "hello",
-                DateTime.now().toInstant(),
+                LocalTime.now(),

Review comment:
       Could you add a field for `DATE` as well? Thanks.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##########
@@ -19,18 +19,19 @@
 
 import com.google.protobuf.ByteString;
 import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
 import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
 import java.time.LocalDate;
+import java.time.LocalTime;

Review comment:
       Please update `ZetaSqlUtilsTest.java` for the new TIME type. Also we should rename that file as well. We forgot to do that in the last PR.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+  // TIME type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testTimeLiteral() {
+    String sql = "SELECT TIME '15:30:00'";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+                .addValues(LocalTime.of(15, 30, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTimeColumn() {
+    String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("15:30:00")
+                .build(),
+            Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+                .addValues("23:35:59")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_TIME function ("SELECT CURRENT_TIME()")
+
+  @Test
+  public void testExtractTime() {
+    String sql =
+        "SELECT "
+            + "EXTRACT(HOUR FROM TIME '15:30:35') as hour, "
+            + "EXTRACT(MINUTE FROM TIME '15:30:35') as minute, "
+            + "EXTRACT(SECOND FROM TIME '15:30:35') as second, "
+            + "EXTRACT(MILLISECOND FROM TIME '15:30:35') as millisecond, "
+            + "EXTRACT(MICROSECOND FROM TIME '15:30:35') as microsecond ";

Review comment:
       Could we test extracting from a time that actually have a microsecond sub component, like `'15;30:35.123456789'`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org