You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/21 22:57:18 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #13588: Add logic to convert Beam schema to DataCatalog schema to SchemaUtils

TheNeuralBit commented on a change in pull request #13588:
URL: https://github.com/apache/beam/pull/13588#discussion_r546971301



##########
File path: sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
##########
@@ -53,7 +53,7 @@
           .build();
 
   /** Convert DataCatalog schema to Beam schema. */
-  static Schema fromDataCatalog(com.google.cloud.datacatalog.v1beta1.Schema dcSchema) {
+  public static Schema fromDataCatalog(com.google.cloud.datacatalog.v1beta1.Schema dcSchema) {

Review comment:
       Rather than making these functions public so our internal code can use them, could we try to move the internal code into Beam? I think it should be possible to move it into DataCatalogTableProvider

##########
File path: sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
##########
@@ -98,4 +98,91 @@ private static FieldType getBeamFieldType(ColumnSchema column) {
     throw new UnsupportedOperationException(
         "Field type '" + dcFieldType + "' is not supported (field '" + column.getColumn() + "')");
   }
+
+  /** Convert Beam schema to DataCatalog schema. */
+  public static com.google.cloud.datacatalog.v1beta1.Schema toDataCatalog(Schema schema) {
+    com.google.cloud.datacatalog.v1beta1.Schema.Builder schemaBuilder =
+        com.google.cloud.datacatalog.v1beta1.Schema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      schemaBuilder.addColumns(fromBeamField(field));
+    }
+    return schemaBuilder.build();
+  }
+
+  private static ColumnSchema fromBeamField(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    if (fieldType.getTypeName().equals(Schema.TypeName.ARRAY)) {
+      if (fieldType.getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array type is not supported in DataCatalog schemas: " + fieldType);
+      } else if (fieldType.getCollectionElementType().getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array element type is not supported in DataCatalog schemas: " + fieldType);
+      } else if (fieldType.getCollectionElementType().getTypeName().equals(Schema.TypeName.ARRAY)) {
+        throw new UnsupportedOperationException(
+            "Array of arrays not supported in DataCatalog schemas: " + fieldType);
+      }
+      ColumnSchema column =
+          fromBeamField(Field.of(field.getName(), fieldType.getCollectionElementType()));
+      if (!column.getMode().isEmpty()) {
+        // We should have bailed out earlier for any cases that would result in mode being set.
+        throw new AssertionError(
+            "ColumnSchema for collection element type has non-empty mode: " + fieldType);
+      }
+      return column.toBuilder().setMode("REPEATED").build();
+    } else { // struct or primitive type
+      ColumnSchema.Builder colBuilder =
+          ColumnSchema.newBuilder().setType(getDataCatalogType(fieldType));
+
+      if (fieldType.getNullable()) {
+        colBuilder.setMode("NULLABLE");
+      }
+
+      // if this is a struct, add the child columns
+      if (fieldType.getTypeName().equals(Schema.TypeName.ROW)) {
+        for (Schema.Field subField : fieldType.getRowSchema().getFields()) {
+          colBuilder.addSubcolumns(fromBeamField(subField));
+        }
+      }
+
+      return colBuilder.setColumn(field.getName()).build();
+    }
+  }
+
+  private static String getDataCatalogType(FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case INT32:
+      case INT64:
+      case BYTES:
+      case DOUBLE:
+      case STRING:
+        return fieldType.getTypeName().name();
+      case BOOLEAN:
+        return "BOOL";
+      case DATETIME:
+        return "TIMESTAMP";
+      case DECIMAL:
+        return "NUMERIC";
+      case LOGICAL_TYPE:
+        Schema.LogicalType logical = fieldType.getLogicalType();
+        if (SqlTypes.TIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "TIME";
+        } else if (SqlTypes.DATE.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATE";
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATETIME";
+        } else {
+          throw new UnsupportedOperationException("Unsupported logical type: " + logical);
+        }
+      case ROW:
+        return "STRUCT";
+      case MAP:
+        return String.format(
+            "MAP<%s,%s>",
+            getDataCatalogType(fieldType.getMapKeyType()),
+            getDataCatalogType(fieldType.getMapValueType()));
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + fieldType);
+    }
+  }

Review comment:
       Could you add unit tests that convert to/from data catalog types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org