You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/08 02:34:00 UTC

[GitHub] [beam] robinyqiu opened a new pull request #12507: [WIP] Move value conversion logic out of ExpressionConverter

robinyqiu opened a new pull request #12507:
URL: https://github.com/apache/beam/pull/12507


   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12507:
URL: https://github.com/apache/beam/pull/12507#discussion_r468095301



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
##########
@@ -17,174 +17,333 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
-import static java.util.stream.Collectors.toList;
-
-import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
+import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
+import com.google.zetasql.functions.ZetaSQLDateTime.DateTimestampPart;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.List;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
+import org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /**
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
- * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
+ * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM (internal), PROTO, GEOGRAPHY
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
 
   private ZetaSqlCalciteTranslationUtils() {}
 
+  // TODO[BEAM-9178]: support DateTimestampPart.WEEK and "WEEK with weekday"s
+  private static final ImmutableMap<Integer, TimeUnit> TIME_UNIT_CASTING_MAP =
+      ImmutableMap.<Integer, TimeUnit>builder()
+          .put(DateTimestampPart.YEAR.getNumber(), TimeUnit.YEAR)
+          .put(DateTimestampPart.MONTH.getNumber(), TimeUnit.MONTH)
+          .put(DateTimestampPart.DAY.getNumber(), TimeUnit.DAY)
+          .put(DateTimestampPart.DAYOFWEEK.getNumber(), TimeUnit.DOW)
+          .put(DateTimestampPart.DAYOFYEAR.getNumber(), TimeUnit.DOY)
+          .put(DateTimestampPart.QUARTER.getNumber(), TimeUnit.QUARTER)
+          .put(DateTimestampPart.HOUR.getNumber(), TimeUnit.HOUR)
+          .put(DateTimestampPart.MINUTE.getNumber(), TimeUnit.MINUTE)
+          .put(DateTimestampPart.SECOND.getNumber(), TimeUnit.SECOND)
+          .put(DateTimestampPart.MILLISECOND.getNumber(), TimeUnit.MILLISECOND)
+          .put(DateTimestampPart.MICROSECOND.getNumber(), TimeUnit.MICROSECOND)
+          .put(DateTimestampPart.ISOYEAR.getNumber(), TimeUnit.ISOYEAR)
+          .put(DateTimestampPart.ISOWEEK.getNumber(), TimeUnit.WEEK)
+          .build();
+
   // Type conversion: Calcite => ZetaSQL
-  public static Type toZetaType(RelDataType calciteType) {
+  public static Type toZetaSqlType(RelDataType calciteType) {
     switch (calciteType.getSqlTypeName()) {
       case BIGINT:
-        return TypeFactory.createSimpleType(TYPE_INT64);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
       case DOUBLE:
-        return TypeFactory.createSimpleType(TYPE_DOUBLE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DOUBLE);
       case BOOLEAN:
-        return TypeFactory.createSimpleType(TYPE_BOOL);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BOOL);
       case VARCHAR:
-        return TypeFactory.createSimpleType(TYPE_STRING);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
       case VARBINARY:
-        return TypeFactory.createSimpleType(TYPE_BYTES);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
       case DECIMAL:
-        return TypeFactory.createSimpleType(TYPE_NUMERIC);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC);
       case DATE:
-        return TypeFactory.createSimpleType(TYPE_DATE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
       case TIME:
-        return TypeFactory.createSimpleType(TYPE_TIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return TypeFactory.createSimpleType(TYPE_DATETIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
       case TIMESTAMP:
-        return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
       case ARRAY:
-        return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
+        return TypeFactory.createArrayType(toZetaSqlType(calciteType.getComponentType()));
       case ROW:
-        List<StructField> structFields =
+        return TypeFactory.createStructType(
             calciteType.getFieldList().stream()
-                .map(f -> new StructField(f.getName(), toZetaType(f.getType())))
-                .collect(toList());
-
-        return TypeFactory.createStructType(structFields);
+                .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
+                .collect(Collectors.toList()));
       default:
-        throw new UnsupportedOperationException("Unsupported RelDataType: " + calciteType);
+        throw new UnsupportedOperationException(
+            "Unknown Calcite type: " + calciteType.getSqlTypeName().getName());
     }
   }
 
   // Type conversion: ZetaSQL => Calcite
-  public static SqlTypeName toCalciteTypeName(TypeKind type) {
-    switch (type) {
+  public static RelDataType toCalciteType(Type type, boolean nullable, RexBuilder rexBuilder) {
+    RelDataType nonNullType;
+    switch (type.getKind()) {
       case TYPE_INT64:
-        return SqlTypeName.BIGINT;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+        break;
       case TYPE_DOUBLE:
-        return SqlTypeName.DOUBLE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+        break;
       case TYPE_BOOL:
-        return SqlTypeName.BOOLEAN;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN);
+        break;
       case TYPE_STRING:
-        return SqlTypeName.VARCHAR;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+        break;
       case TYPE_BYTES:
-        return SqlTypeName.VARBINARY;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
+        break;
       case TYPE_NUMERIC:
-        return SqlTypeName.DECIMAL;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DECIMAL);
+        break;
       case TYPE_DATE:
-        return SqlTypeName.DATE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DATE);
+        break;
       case TYPE_TIME:
-        return SqlTypeName.TIME;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIME);
+        break;
       case TYPE_DATETIME:
-        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        nonNullType =
+            rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        break;
       case TYPE_TIMESTAMP:
-        // TODO: handle timestamp with time zone.
-        return SqlTypeName.TIMESTAMP;
-        // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      case TYPE_ARRAY:
+        // TODO: Should element type has the same nullability as the array type?
+        nonNullType = toCalciteArrayType(type.asArray().getElementType(), nullable, rexBuilder);
+        break;
+      case TYPE_STRUCT:
+        // TODO: Should field type has the same nullability as the struct type?
+        nonNullType = toCalciteStructType(type.asStruct(), nullable, rexBuilder);
+        break;
       default:
-        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.name());
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind().name());
     }
+    return rexBuilder.getTypeFactory().createTypeWithNullability(nonNullType, nullable);
   }
 
-  public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) {
-    if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
-      return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
-    } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
-      return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
-    } else {
-      // TODO: Check type's nullability?
-      return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
-    }
+  private static RelDataType toCalciteArrayType(
+      Type elementType, boolean nullable, RexBuilder rexBuilder) {
+    return rexBuilder
+        .getTypeFactory()
+        // -1 cardinality means unlimited array size
+        .createArrayType(toCalciteType(elementType, nullable, rexBuilder), -1);
   }
 
-  public static RelDataType toArrayRelDataType(
-      RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
-    // -1 cardinality means unlimited array size.
-    // TODO: is unlimited array size right for general case?
-    // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
-    return nullable(
-        rexBuilder,
-        rexBuilder
-            .getTypeFactory()
-            .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
-        isNullable);
+  private static RelDataType toCalciteStructType(
+      StructType structType, boolean nullable, RexBuilder rexBuilder) {
+    List<StructField> fields = structType.getFieldList();
+    List<String> fieldNames = getFieldNameList(fields);
+    List<RelDataType> fieldTypes =
+        fields.stream()
+            .map(f -> toCalciteType(f.getType(), nullable, rexBuilder))
+            .collect(Collectors.toList());
+    return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
   }
 
-  private static List<String> toNameList(List<StructField> fields) {
+  private static List<String> getFieldNameList(List<StructField> fields) {
     ImmutableList.Builder<String> b = ImmutableList.builder();
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i;
+        name = "$col" + i; // empty field name is not allowed, generate an index-based name for it

Review comment:
       Yeah this comment is not accurate. The real problem is Beam does not allow multiple columns with the same name (empty in this case). Thanks spotting this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #12507:
URL: https://github.com/apache/beam/pull/12507#discussion_r468087411



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
##########
@@ -17,174 +17,333 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
-import static java.util.stream.Collectors.toList;
-
-import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
+import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
+import com.google.zetasql.functions.ZetaSQLDateTime.DateTimestampPart;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.List;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
+import org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /**
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
- * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
+ * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM (internal), PROTO, GEOGRAPHY
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
 
   private ZetaSqlCalciteTranslationUtils() {}
 
+  // TODO[BEAM-9178]: support DateTimestampPart.WEEK and "WEEK with weekday"s
+  private static final ImmutableMap<Integer, TimeUnit> TIME_UNIT_CASTING_MAP =
+      ImmutableMap.<Integer, TimeUnit>builder()
+          .put(DateTimestampPart.YEAR.getNumber(), TimeUnit.YEAR)
+          .put(DateTimestampPart.MONTH.getNumber(), TimeUnit.MONTH)
+          .put(DateTimestampPart.DAY.getNumber(), TimeUnit.DAY)
+          .put(DateTimestampPart.DAYOFWEEK.getNumber(), TimeUnit.DOW)
+          .put(DateTimestampPart.DAYOFYEAR.getNumber(), TimeUnit.DOY)
+          .put(DateTimestampPart.QUARTER.getNumber(), TimeUnit.QUARTER)
+          .put(DateTimestampPart.HOUR.getNumber(), TimeUnit.HOUR)
+          .put(DateTimestampPart.MINUTE.getNumber(), TimeUnit.MINUTE)
+          .put(DateTimestampPart.SECOND.getNumber(), TimeUnit.SECOND)
+          .put(DateTimestampPart.MILLISECOND.getNumber(), TimeUnit.MILLISECOND)
+          .put(DateTimestampPart.MICROSECOND.getNumber(), TimeUnit.MICROSECOND)
+          .put(DateTimestampPart.ISOYEAR.getNumber(), TimeUnit.ISOYEAR)
+          .put(DateTimestampPart.ISOWEEK.getNumber(), TimeUnit.WEEK)
+          .build();
+
   // Type conversion: Calcite => ZetaSQL
-  public static Type toZetaType(RelDataType calciteType) {
+  public static Type toZetaSqlType(RelDataType calciteType) {
     switch (calciteType.getSqlTypeName()) {
       case BIGINT:
-        return TypeFactory.createSimpleType(TYPE_INT64);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
       case DOUBLE:
-        return TypeFactory.createSimpleType(TYPE_DOUBLE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DOUBLE);
       case BOOLEAN:
-        return TypeFactory.createSimpleType(TYPE_BOOL);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BOOL);
       case VARCHAR:
-        return TypeFactory.createSimpleType(TYPE_STRING);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
       case VARBINARY:
-        return TypeFactory.createSimpleType(TYPE_BYTES);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
       case DECIMAL:
-        return TypeFactory.createSimpleType(TYPE_NUMERIC);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC);
       case DATE:
-        return TypeFactory.createSimpleType(TYPE_DATE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
       case TIME:
-        return TypeFactory.createSimpleType(TYPE_TIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return TypeFactory.createSimpleType(TYPE_DATETIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
       case TIMESTAMP:
-        return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
       case ARRAY:
-        return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
+        return TypeFactory.createArrayType(toZetaSqlType(calciteType.getComponentType()));
       case ROW:
-        List<StructField> structFields =
+        return TypeFactory.createStructType(
             calciteType.getFieldList().stream()
-                .map(f -> new StructField(f.getName(), toZetaType(f.getType())))
-                .collect(toList());
-
-        return TypeFactory.createStructType(structFields);
+                .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
+                .collect(Collectors.toList()));
       default:
-        throw new UnsupportedOperationException("Unsupported RelDataType: " + calciteType);
+        throw new UnsupportedOperationException(
+            "Unknown Calcite type: " + calciteType.getSqlTypeName().getName());
     }
   }
 
   // Type conversion: ZetaSQL => Calcite
-  public static SqlTypeName toCalciteTypeName(TypeKind type) {
-    switch (type) {
+  public static RelDataType toCalciteType(Type type, boolean nullable, RexBuilder rexBuilder) {
+    RelDataType nonNullType;
+    switch (type.getKind()) {
       case TYPE_INT64:
-        return SqlTypeName.BIGINT;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+        break;
       case TYPE_DOUBLE:
-        return SqlTypeName.DOUBLE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+        break;
       case TYPE_BOOL:
-        return SqlTypeName.BOOLEAN;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN);
+        break;
       case TYPE_STRING:
-        return SqlTypeName.VARCHAR;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+        break;
       case TYPE_BYTES:
-        return SqlTypeName.VARBINARY;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
+        break;
       case TYPE_NUMERIC:
-        return SqlTypeName.DECIMAL;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DECIMAL);
+        break;
       case TYPE_DATE:
-        return SqlTypeName.DATE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DATE);
+        break;
       case TYPE_TIME:
-        return SqlTypeName.TIME;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIME);
+        break;
       case TYPE_DATETIME:
-        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        nonNullType =
+            rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        break;
       case TYPE_TIMESTAMP:
-        // TODO: handle timestamp with time zone.
-        return SqlTypeName.TIMESTAMP;
-        // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      case TYPE_ARRAY:
+        // TODO: Should element type has the same nullability as the array type?
+        nonNullType = toCalciteArrayType(type.asArray().getElementType(), nullable, rexBuilder);
+        break;
+      case TYPE_STRUCT:
+        // TODO: Should field type has the same nullability as the struct type?
+        nonNullType = toCalciteStructType(type.asStruct(), nullable, rexBuilder);
+        break;
       default:
-        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.name());
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind().name());
     }
+    return rexBuilder.getTypeFactory().createTypeWithNullability(nonNullType, nullable);
   }
 
-  public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) {
-    if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
-      return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
-    } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
-      return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
-    } else {
-      // TODO: Check type's nullability?
-      return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
-    }
+  private static RelDataType toCalciteArrayType(
+      Type elementType, boolean nullable, RexBuilder rexBuilder) {
+    return rexBuilder
+        .getTypeFactory()
+        // -1 cardinality means unlimited array size
+        .createArrayType(toCalciteType(elementType, nullable, rexBuilder), -1);
   }
 
-  public static RelDataType toArrayRelDataType(
-      RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
-    // -1 cardinality means unlimited array size.
-    // TODO: is unlimited array size right for general case?
-    // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
-    return nullable(
-        rexBuilder,
-        rexBuilder
-            .getTypeFactory()
-            .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
-        isNullable);
+  private static RelDataType toCalciteStructType(
+      StructType structType, boolean nullable, RexBuilder rexBuilder) {
+    List<StructField> fields = structType.getFieldList();
+    List<String> fieldNames = getFieldNameList(fields);
+    List<RelDataType> fieldTypes =
+        fields.stream()
+            .map(f -> toCalciteType(f.getType(), nullable, rexBuilder))
+            .collect(Collectors.toList());
+    return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
   }
 
-  private static List<String> toNameList(List<StructField> fields) {
+  private static List<String> getFieldNameList(List<StructField> fields) {
     ImmutableList.Builder<String> b = ImmutableList.builder();
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i;
+        name = "$col" + i; // empty field name is not allowed, generate an index-based name for it

Review comment:
       I believe a single column with an empty name is allowed. ZetaSQL allows multiple columns without names.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12507:
URL: https://github.com/apache/beam/pull/12507#discussion_r469459270



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
##########
@@ -17,174 +17,333 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
-import static java.util.stream.Collectors.toList;
-
-import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
+import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
+import com.google.zetasql.functions.ZetaSQLDateTime.DateTimestampPart;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.List;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
+import org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /**
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
- * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
+ * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM (internal), PROTO, GEOGRAPHY
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
 
   private ZetaSqlCalciteTranslationUtils() {}
 
+  // TODO[BEAM-9178]: support DateTimestampPart.WEEK and "WEEK with weekday"s
+  private static final ImmutableMap<Integer, TimeUnit> TIME_UNIT_CASTING_MAP =
+      ImmutableMap.<Integer, TimeUnit>builder()
+          .put(DateTimestampPart.YEAR.getNumber(), TimeUnit.YEAR)
+          .put(DateTimestampPart.MONTH.getNumber(), TimeUnit.MONTH)
+          .put(DateTimestampPart.DAY.getNumber(), TimeUnit.DAY)
+          .put(DateTimestampPart.DAYOFWEEK.getNumber(), TimeUnit.DOW)
+          .put(DateTimestampPart.DAYOFYEAR.getNumber(), TimeUnit.DOY)
+          .put(DateTimestampPart.QUARTER.getNumber(), TimeUnit.QUARTER)
+          .put(DateTimestampPart.HOUR.getNumber(), TimeUnit.HOUR)
+          .put(DateTimestampPart.MINUTE.getNumber(), TimeUnit.MINUTE)
+          .put(DateTimestampPart.SECOND.getNumber(), TimeUnit.SECOND)
+          .put(DateTimestampPart.MILLISECOND.getNumber(), TimeUnit.MILLISECOND)
+          .put(DateTimestampPart.MICROSECOND.getNumber(), TimeUnit.MICROSECOND)
+          .put(DateTimestampPart.ISOYEAR.getNumber(), TimeUnit.ISOYEAR)
+          .put(DateTimestampPart.ISOWEEK.getNumber(), TimeUnit.WEEK)
+          .build();
+
   // Type conversion: Calcite => ZetaSQL
-  public static Type toZetaType(RelDataType calciteType) {
+  public static Type toZetaSqlType(RelDataType calciteType) {
     switch (calciteType.getSqlTypeName()) {
       case BIGINT:
-        return TypeFactory.createSimpleType(TYPE_INT64);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
       case DOUBLE:
-        return TypeFactory.createSimpleType(TYPE_DOUBLE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DOUBLE);
       case BOOLEAN:
-        return TypeFactory.createSimpleType(TYPE_BOOL);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BOOL);
       case VARCHAR:
-        return TypeFactory.createSimpleType(TYPE_STRING);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
       case VARBINARY:
-        return TypeFactory.createSimpleType(TYPE_BYTES);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
       case DECIMAL:
-        return TypeFactory.createSimpleType(TYPE_NUMERIC);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC);
       case DATE:
-        return TypeFactory.createSimpleType(TYPE_DATE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
       case TIME:
-        return TypeFactory.createSimpleType(TYPE_TIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return TypeFactory.createSimpleType(TYPE_DATETIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
       case TIMESTAMP:
-        return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
       case ARRAY:
-        return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
+        return TypeFactory.createArrayType(toZetaSqlType(calciteType.getComponentType()));
       case ROW:
-        List<StructField> structFields =
+        return TypeFactory.createStructType(
             calciteType.getFieldList().stream()
-                .map(f -> new StructField(f.getName(), toZetaType(f.getType())))
-                .collect(toList());
-
-        return TypeFactory.createStructType(structFields);
+                .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
+                .collect(Collectors.toList()));
       default:
-        throw new UnsupportedOperationException("Unsupported RelDataType: " + calciteType);
+        throw new UnsupportedOperationException(
+            "Unknown Calcite type: " + calciteType.getSqlTypeName().getName());
     }
   }
 
   // Type conversion: ZetaSQL => Calcite
-  public static SqlTypeName toCalciteTypeName(TypeKind type) {
-    switch (type) {
+  public static RelDataType toCalciteType(Type type, boolean nullable, RexBuilder rexBuilder) {
+    RelDataType nonNullType;
+    switch (type.getKind()) {
       case TYPE_INT64:
-        return SqlTypeName.BIGINT;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+        break;
       case TYPE_DOUBLE:
-        return SqlTypeName.DOUBLE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+        break;
       case TYPE_BOOL:
-        return SqlTypeName.BOOLEAN;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN);
+        break;
       case TYPE_STRING:
-        return SqlTypeName.VARCHAR;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+        break;
       case TYPE_BYTES:
-        return SqlTypeName.VARBINARY;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
+        break;
       case TYPE_NUMERIC:
-        return SqlTypeName.DECIMAL;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DECIMAL);
+        break;
       case TYPE_DATE:
-        return SqlTypeName.DATE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DATE);
+        break;
       case TYPE_TIME:
-        return SqlTypeName.TIME;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIME);
+        break;
       case TYPE_DATETIME:
-        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        nonNullType =
+            rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        break;
       case TYPE_TIMESTAMP:
-        // TODO: handle timestamp with time zone.
-        return SqlTypeName.TIMESTAMP;
-        // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      case TYPE_ARRAY:
+        // TODO: Should element type has the same nullability as the array type?
+        nonNullType = toCalciteArrayType(type.asArray().getElementType(), nullable, rexBuilder);
+        break;
+      case TYPE_STRUCT:
+        // TODO: Should field type has the same nullability as the struct type?
+        nonNullType = toCalciteStructType(type.asStruct(), nullable, rexBuilder);
+        break;
       default:
-        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.name());
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind().name());
     }
+    return rexBuilder.getTypeFactory().createTypeWithNullability(nonNullType, nullable);
   }
 
-  public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) {
-    if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
-      return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
-    } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
-      return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
-    } else {
-      // TODO: Check type's nullability?
-      return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
-    }
+  private static RelDataType toCalciteArrayType(
+      Type elementType, boolean nullable, RexBuilder rexBuilder) {
+    return rexBuilder
+        .getTypeFactory()
+        // -1 cardinality means unlimited array size
+        .createArrayType(toCalciteType(elementType, nullable, rexBuilder), -1);
   }
 
-  public static RelDataType toArrayRelDataType(
-      RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
-    // -1 cardinality means unlimited array size.
-    // TODO: is unlimited array size right for general case?
-    // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
-    return nullable(
-        rexBuilder,
-        rexBuilder
-            .getTypeFactory()
-            .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
-        isNullable);
+  private static RelDataType toCalciteStructType(
+      StructType structType, boolean nullable, RexBuilder rexBuilder) {
+    List<StructField> fields = structType.getFieldList();
+    List<String> fieldNames = getFieldNameList(fields);
+    List<RelDataType> fieldTypes =
+        fields.stream()
+            .map(f -> toCalciteType(f.getType(), nullable, rexBuilder))
+            .collect(Collectors.toList());
+    return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
   }
 
-  private static List<String> toNameList(List<StructField> fields) {
+  private static List<String> getFieldNameList(List<StructField> fields) {
     ImmutableList.Builder<String> b = ImmutableList.builder();
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i;
+        name = "$col" + i; // empty field name is not allowed, generate an index-based name for it
       }
       b.add(name);
     }
     return b.build();
   }
 
-  public static RelDataType toStructRelDataType(
-      RexBuilder rexBuilder, StructType structType, boolean isNullable) {
+  // Value conversion: ZetaSQL => Calcite
+  public static RexNode toRexNode(Value value, RexBuilder rexBuilder) {
+    Type type = value.getType();
+    if (value.isNull()) {
+      return rexBuilder.makeNullLiteral(toCalciteType(type, true, rexBuilder));
+    }
 
-    List<StructField> fields = structType.getFieldList();
-    List<String> fieldNames = toNameList(fields);
-    List<RelDataType> fieldTypes =
-        fields.stream()
-            .map(f -> toRelDataType(rexBuilder, f.getType(), isNullable))
-            .collect(toList());
+    switch (type.getKind()) {
+      case TYPE_INT64:
+        return rexBuilder.makeExactLiteral(
+            new BigDecimal(value.getInt64Value()), toCalciteType(type, false, rexBuilder));
+      case TYPE_DOUBLE:
+        // Cannot simply call makeApproxLiteral() for ZetaSQL DOUBLE type because positive infinity,
+        // negative infinity and NaN cannot be directly converted to BigDecimal. So we create three
+        // wrapper functions here for these three cases such that we can later recognize it and
+        // customize its unparsing in BeamBigQuerySqlDialect.
+        double val = value.getDoubleValue();
+        String wrapperFun = null;
+        if (val == Double.POSITIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_FUNCTION;
+        } else if (val == Double.NEGATIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_FUNCTION;
+        } else if (Double.isNaN(val)) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION;
+        }
 
-    return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
+        RelDataType returnType = toCalciteType(type, false, rexBuilder);
+        if (wrapperFun == null) {
+          return rexBuilder.makeApproxLiteral(new BigDecimal(val), returnType);
+        } else if (BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION.equals(wrapperFun)) {
+          // TODO[BEAM-10550]: Update the temporary workaround below after vendored Calcite version.
+          // Adding an additional random parameter for the wrapper function of NaN, to avoid
+          // triggering Calcite operation simplification. (e.g. 'NaN == NaN' would be simplify to
+          // 'null or NaN is not null' in Calcite. This would miscalculate the expression to be
+          // true, which should be false.)
+          return rexBuilder.makeCall(
+              SqlOperators.createZetaSqlFunction(wrapperFun, returnType.getSqlTypeName()),
+              rexBuilder.makeApproxLiteral(BigDecimal.valueOf(Math.random()), returnType));
+        } else {
+          return rexBuilder.makeCall(
+              SqlOperators.createZetaSqlFunction(wrapperFun, returnType.getSqlTypeName()));
+        }
+      case TYPE_BOOL:
+        return rexBuilder.makeLiteral(value.getBoolValue());
+      case TYPE_STRING:
+        // Has to allow CAST because Calcite create CHAR type first and does a CAST to VARCHAR.
+        // If not allow cast, rexBuilder() will only build a literal with CHAR type.
+        return rexBuilder.makeLiteral(
+            value.getStringValue(), toCalciteType(type, false, rexBuilder), true);
+      case TYPE_BYTES:
+        return rexBuilder.makeBinaryLiteral(new ByteString(value.getBytesValue().toByteArray()));
+      case TYPE_NUMERIC:
+        // Cannot simply call makeExactLiteral() for ZetaSQL NUMERIC type because later it will be
+        // unparsed to the string representation of the BigDecimal itself (e.g. "SELECT NUMERIC '0'"
+        // will be unparsed to "SELECT 0E-9"), and Calcite does not allow customize unparsing of
+        // SqlNumericLiteral. So we create a wrapper function here such that we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        return rexBuilder.makeCall(
+            SqlOperators.createZetaSqlFunction(
+                BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
+                toCalciteType(type, false, rexBuilder).getSqlTypeName()),
+            rexBuilder.makeExactLiteral(
+                value.getNumericValue(), toCalciteType(type, false, rexBuilder)));
+      case TYPE_DATE:
+        return rexBuilder.makeDateLiteral(dateValueToDateString(value));
+      case TYPE_TIME:
+        return rexBuilder.makeTimeLiteral(
+            timeValueToTimeString(value),
+            rexBuilder.getTypeFactory().getTypeSystem().getMaxPrecision(SqlTypeName.TIME));
+      case TYPE_DATETIME:
+        return rexBuilder.makeTimestampWithLocalTimeZoneLiteral(
+            datetimeValueToTimestampString(value),
+            rexBuilder
+                .getTypeFactory()
+                .getTypeSystem()
+                .getMaxPrecision(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE));
+      case TYPE_TIMESTAMP:
+        return rexBuilder.makeTimestampLiteral(
+            timestampValueToTimestampString(value),
+            rexBuilder.getTypeFactory().getTypeSystem().getMaxPrecision(SqlTypeName.TIMESTAMP));
+      case TYPE_ARRAY:
+        return arrayValueToRexNode(value, rexBuilder);
+      case TYPE_STRUCT:
+        return structValueToRexNode(value, rexBuilder);
+      case TYPE_ENUM: // internal only, used for DateTimestampPart
+        return enumValueToRexNode(value, rexBuilder);
+      default:
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind().name());
+    }
+  }
+
+  private static RexNode arrayValueToRexNode(Value value, RexBuilder rexBuilder) {
+    return rexBuilder.makeCall(
+        toCalciteArrayType(value.getType().asArray().getElementType(), false, rexBuilder),
+        SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+        value.getElementList().stream()
+            .map(v -> toRexNode(v, rexBuilder))
+            .collect(Collectors.toList()));
+  }
+
+  private static RexNode structValueToRexNode(Value value, RexBuilder rexBuilder) {
+    return rexBuilder.makeCall(
+        toCalciteStructType(value.getType().asStruct(), false, rexBuilder),

Review comment:
       This LOC specified the return STRUCT type explicitly, so Calcite does not need to infer. Adding this line introduced a bug, because later when the schema is translated to Beam Schema, duplicate field names will be caught. https://github.com/apache/beam/pull/12550 fixes this bug.
   
   @apilloud 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ZijieSong946 commented on pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
ZijieSong946 commented on pull request #12507:
URL: https://github.com/apache/beam/pull/12507#issuecomment-671554849


   > cc: @ZijieSong946 This recent refactor caused a merge conflict for your WIP TIMESTAMP PR.
   
   Ack.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12507:
URL: https://github.com/apache/beam/pull/12507#discussion_r468095301



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
##########
@@ -17,174 +17,333 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
-import static java.util.stream.Collectors.toList;
-
-import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
+import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
+import com.google.zetasql.functions.ZetaSQLDateTime.DateTimestampPart;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.List;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
+import org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /**
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
- * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
+ * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM (internal), PROTO, GEOGRAPHY
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
 
   private ZetaSqlCalciteTranslationUtils() {}
 
+  // TODO[BEAM-9178]: support DateTimestampPart.WEEK and "WEEK with weekday"s
+  private static final ImmutableMap<Integer, TimeUnit> TIME_UNIT_CASTING_MAP =
+      ImmutableMap.<Integer, TimeUnit>builder()
+          .put(DateTimestampPart.YEAR.getNumber(), TimeUnit.YEAR)
+          .put(DateTimestampPart.MONTH.getNumber(), TimeUnit.MONTH)
+          .put(DateTimestampPart.DAY.getNumber(), TimeUnit.DAY)
+          .put(DateTimestampPart.DAYOFWEEK.getNumber(), TimeUnit.DOW)
+          .put(DateTimestampPart.DAYOFYEAR.getNumber(), TimeUnit.DOY)
+          .put(DateTimestampPart.QUARTER.getNumber(), TimeUnit.QUARTER)
+          .put(DateTimestampPart.HOUR.getNumber(), TimeUnit.HOUR)
+          .put(DateTimestampPart.MINUTE.getNumber(), TimeUnit.MINUTE)
+          .put(DateTimestampPart.SECOND.getNumber(), TimeUnit.SECOND)
+          .put(DateTimestampPart.MILLISECOND.getNumber(), TimeUnit.MILLISECOND)
+          .put(DateTimestampPart.MICROSECOND.getNumber(), TimeUnit.MICROSECOND)
+          .put(DateTimestampPart.ISOYEAR.getNumber(), TimeUnit.ISOYEAR)
+          .put(DateTimestampPart.ISOWEEK.getNumber(), TimeUnit.WEEK)
+          .build();
+
   // Type conversion: Calcite => ZetaSQL
-  public static Type toZetaType(RelDataType calciteType) {
+  public static Type toZetaSqlType(RelDataType calciteType) {
     switch (calciteType.getSqlTypeName()) {
       case BIGINT:
-        return TypeFactory.createSimpleType(TYPE_INT64);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
       case DOUBLE:
-        return TypeFactory.createSimpleType(TYPE_DOUBLE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DOUBLE);
       case BOOLEAN:
-        return TypeFactory.createSimpleType(TYPE_BOOL);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BOOL);
       case VARCHAR:
-        return TypeFactory.createSimpleType(TYPE_STRING);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
       case VARBINARY:
-        return TypeFactory.createSimpleType(TYPE_BYTES);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
       case DECIMAL:
-        return TypeFactory.createSimpleType(TYPE_NUMERIC);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC);
       case DATE:
-        return TypeFactory.createSimpleType(TYPE_DATE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
       case TIME:
-        return TypeFactory.createSimpleType(TYPE_TIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return TypeFactory.createSimpleType(TYPE_DATETIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
       case TIMESTAMP:
-        return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
       case ARRAY:
-        return TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
+        return TypeFactory.createArrayType(toZetaSqlType(calciteType.getComponentType()));
       case ROW:
-        List<StructField> structFields =
+        return TypeFactory.createStructType(
             calciteType.getFieldList().stream()
-                .map(f -> new StructField(f.getName(), toZetaType(f.getType())))
-                .collect(toList());
-
-        return TypeFactory.createStructType(structFields);
+                .map(f -> new StructField(f.getName(), toZetaSqlType(f.getType())))
+                .collect(Collectors.toList()));
       default:
-        throw new UnsupportedOperationException("Unsupported RelDataType: " + calciteType);
+        throw new UnsupportedOperationException(
+            "Unknown Calcite type: " + calciteType.getSqlTypeName().getName());
     }
   }
 
   // Type conversion: ZetaSQL => Calcite
-  public static SqlTypeName toCalciteTypeName(TypeKind type) {
-    switch (type) {
+  public static RelDataType toCalciteType(Type type, boolean nullable, RexBuilder rexBuilder) {
+    RelDataType nonNullType;
+    switch (type.getKind()) {
       case TYPE_INT64:
-        return SqlTypeName.BIGINT;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+        break;
       case TYPE_DOUBLE:
-        return SqlTypeName.DOUBLE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+        break;
       case TYPE_BOOL:
-        return SqlTypeName.BOOLEAN;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN);
+        break;
       case TYPE_STRING:
-        return SqlTypeName.VARCHAR;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+        break;
       case TYPE_BYTES:
-        return SqlTypeName.VARBINARY;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
+        break;
       case TYPE_NUMERIC:
-        return SqlTypeName.DECIMAL;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DECIMAL);
+        break;
       case TYPE_DATE:
-        return SqlTypeName.DATE;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DATE);
+        break;
       case TYPE_TIME:
-        return SqlTypeName.TIME;
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIME);
+        break;
       case TYPE_DATETIME:
-        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        nonNullType =
+            rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        break;
       case TYPE_TIMESTAMP:
-        // TODO: handle timestamp with time zone.
-        return SqlTypeName.TIMESTAMP;
-        // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
+        nonNullType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      case TYPE_ARRAY:
+        // TODO: Should element type has the same nullability as the array type?
+        nonNullType = toCalciteArrayType(type.asArray().getElementType(), nullable, rexBuilder);
+        break;
+      case TYPE_STRUCT:
+        // TODO: Should field type has the same nullability as the struct type?
+        nonNullType = toCalciteStructType(type.asStruct(), nullable, rexBuilder);
+        break;
       default:
-        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.name());
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + type.getKind().name());
     }
+    return rexBuilder.getTypeFactory().createTypeWithNullability(nonNullType, nullable);
   }
 
-  public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, boolean isNullable) {
-    if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
-      return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
-    } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
-      return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
-    } else {
-      // TODO: Check type's nullability?
-      return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
-    }
+  private static RelDataType toCalciteArrayType(
+      Type elementType, boolean nullable, RexBuilder rexBuilder) {
+    return rexBuilder
+        .getTypeFactory()
+        // -1 cardinality means unlimited array size
+        .createArrayType(toCalciteType(elementType, nullable, rexBuilder), -1);
   }
 
-  public static RelDataType toArrayRelDataType(
-      RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
-    // -1 cardinality means unlimited array size.
-    // TODO: is unlimited array size right for general case?
-    // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
-    return nullable(
-        rexBuilder,
-        rexBuilder
-            .getTypeFactory()
-            .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
-        isNullable);
+  private static RelDataType toCalciteStructType(
+      StructType structType, boolean nullable, RexBuilder rexBuilder) {
+    List<StructField> fields = structType.getFieldList();
+    List<String> fieldNames = getFieldNameList(fields);
+    List<RelDataType> fieldTypes =
+        fields.stream()
+            .map(f -> toCalciteType(f.getType(), nullable, rexBuilder))
+            .collect(Collectors.toList());
+    return rexBuilder.getTypeFactory().createStructType(fieldTypes, fieldNames);
   }
 
-  private static List<String> toNameList(List<StructField> fields) {
+  private static List<String> getFieldNameList(List<StructField> fields) {
     ImmutableList.Builder<String> b = ImmutableList.builder();
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i;
+        name = "$col" + i; // empty field name is not allowed, generate an index-based name for it

Review comment:
       Yeah this comment is not accurate. The real problem is Beam does not allow multiple columns with the same name (empty in this case). Thanks for spotting this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on pull request #12507:
URL: https://github.com/apache/beam/pull/12507#issuecomment-671552069


   cc: @ZijieSong946 This recent refactor caused a merge conflict for your WIP TIMESTAMP PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu merged pull request #12507: [BEAM-9179] Move value conversion logic out of ExpressionConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu merged pull request #12507:
URL: https://github.com/apache/beam/pull/12507


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org