You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/12 00:25:50 UTC
[beam] branch master updated: Use new ZetaSQL value create API
(#12536)
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6d7ecb2 Use new ZetaSQL value create API (#12536)
6d7ecb2 is described below
commit 6d7ecb260baff8fff4a50c323d30532c7dc0b4b3
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Tue Aug 11 17:25:36 2020 -0700
Use new ZetaSQL value create API (#12536)
---
.../sdk/extensions/sql/zetasql/DateTimeUtils.java | 3 +--
.../sql/zetasql/ZetaSqlBeamTranslationUtils.java | 21 +++++++++------------
.../sql/zetasql/ZetaSqlCalciteTranslationUtils.java | 8 +++-----
.../zetasql/ZetaSqlBeamTranslationUtilsTest.java | 14 +++++---------
.../sql/zetasql/ZetaSqlTimeFunctionsTest.java | 9 +++------
5 files changed, 21 insertions(+), 34 deletions(-)
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
index eb0da97..c639c09 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
-import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.Value;
import io.grpc.Status;
import java.time.LocalTime;
@@ -166,7 +165,7 @@ public class DateTimeUtils {
public static Value parseTimeToValue(String timeString) {
LocalTime localTime = LocalTime.parse(timeString);
- return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos(localTime));
+ return Value.createTimeValue(localTime);
}
public static Value parseTimestampWithTZToValue(String timestampString) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 5a0762d..d38d597 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
import com.google.protobuf.ByteString;
import com.google.zetasql.ArrayType;
-import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.StructType;
import com.google.zetasql.StructType.StructField;
import com.google.zetasql.Type;
@@ -132,16 +131,16 @@ public final class ZetaSqlBeamTranslationUtils {
if (object instanceof Long) { // base type
return Value.createDateValue(((Long) object).intValue());
} else { // input type
- return Value.createDateValue((int) ((LocalDate) object).toEpochDay());
+ return Value.createDateValue((LocalDate) object);
}
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+ LocalTime localTime;
if (object instanceof Long) { // base type
- return Value.createTimeValue(
- CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.ofNanoOfDay((Long) object)));
+ localTime = LocalTime.ofNanoOfDay((Long) object);
} else { // input type
- return Value.createTimeValue(
- CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
+ localTime = (LocalTime) object;
}
+ return Value.createTimeValue(localTime);
} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
LocalDateTime datetime;
if (object instanceof Row) { // base type
@@ -152,9 +151,7 @@ public final class ZetaSqlBeamTranslationUtils {
} else { // input type
datetime = (LocalDateTime) object;
}
- // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
- return Value.createDatetimeValue(
- CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
+ return Value.createDatetimeValue(datetime);
} else {
throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
}
@@ -254,11 +251,11 @@ public final class ZetaSqlBeamTranslationUtils {
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
- return LocalDate.ofEpochDay(value.getDateValue());
+ return value.getLocalDateValue();
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
- return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+ return value.getLocalTimeValue();
} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
- return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+ return value.getLocalDateTimeValue();
} else {
throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 8586058..620342e 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
-import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.StructType;
import com.google.zetasql.StructType.StructField;
import com.google.zetasql.Type;
@@ -184,7 +183,7 @@ public final class ZetaSqlCalciteTranslationUtils {
for (int i = 0; i < fields.size(); i++) {
String name = fields.get(i).getName();
if ("".equals(name)) {
- name = "$col" + i; // empty field name is not allowed, generate an index-based name for it
+ name = "$col" + i; // avoid empty field names because Beam does not allow duplicate names
}
b.add(name);
}
@@ -318,14 +317,13 @@ public final class ZetaSqlCalciteTranslationUtils {
}
private static TimeString timeValueToTimeString(Value value) {
- LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+ LocalTime localTime = value.getLocalTimeValue();
return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
.withNanos(localTime.getNano());
}
private static TimestampString datetimeValueToTimestampString(Value value) {
- LocalDateTime dateTime =
- CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+ LocalDateTime dateTime = value.getLocalDateTimeValue();
return new TimestampString(
dateTime.getYear(),
dateTime.getMonthValue(),
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index e6e54cf..27b3c8e 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
import com.google.zetasql.ArrayType;
-import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.StructType;
import com.google.zetasql.StructType.StructField;
import com.google.zetasql.TypeFactory;
@@ -100,8 +99,8 @@ public class ZetaSqlBeamTranslationUtilsTest {
.addValue("Hello")
.addValue(new byte[] {0x11, 0x22})
.addValue(LocalDate.of(2020, 6, 4))
- .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
- .addValue(LocalTime.of(15, 30, 45))
+ .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+ .addValue(LocalTime.of(15, 30, 45).withNano(123456000))
.addValue(Instant.ofEpochMilli(12345678L))
.addArray(3.0, 6.5)
.addValue(Row.withSchema(TEST_INNER_SCHEMA).addValues(0L, "world").build())
@@ -118,13 +117,10 @@ public class ZetaSqlBeamTranslationUtilsTest {
Value.createBoolValue(false),
Value.createStringValue("Hello"),
Value.createBytesValue(ByteString.copyFrom(new byte[] {0x11, 0x22})),
- Value.createDateValue((int) LocalDate.of(2020, 6, 4).toEpochDay()),
+ Value.createDateValue(LocalDate.of(2020, 6, 4)),
Value.createDatetimeValue(
- CivilTimeEncoder.encodePacked64DatetimeSeconds(
- LocalDateTime.of(2008, 12, 25, 15, 30, 0)),
- LocalDateTime.of(2008, 12, 25, 15, 30, 0).getNano()),
- Value.createTimeValue(
- CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.of(15, 30, 45))),
+ LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000)),
+ Value.createTimeValue(LocalTime.of(15, 30, 45).withNano(123456000)),
Value.createTimestampValueFromUnixMicros(12345678000L),
Value.createArrayValue(
TEST_INNER_ARRAY_TYPE,
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
index 109ca1e..ca37715 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTime
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone;
-import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.time.LocalDate;
@@ -1049,12 +1048,10 @@ public class ZetaSqlTimeFunctionsTest extends ZetaSqlTestBase {
public void testDateTimeAddWithParameter() {
String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)";
- LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000);
+ LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000);
ImmutableMap<String, Value> params =
ImmutableMap.of(
- "p0",
- Value.createDatetimeValue(
- CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()),
+ "p0", Value.createDatetimeValue(datetime),
"p1", Value.createInt64Value(3L));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
@@ -1065,7 +1062,7 @@ public class ZetaSqlTimeFunctionsTest extends ZetaSqlTestBase {
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
- .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000))
+ .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 0).withNano(123456000))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}