You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/12/31 01:12:16 UTC

[beam] branch master updated: [BEAM-11533] Add logic to convert Beam schema to DataCatalog schema to SchemaUtils (#13588)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 48673e8  [BEAM-11533] Add logic to convert Beam schema to DataCatalog schema to SchemaUtils (#13588)
48673e8 is described below

commit 48673e8e2230eef514246686760d8630c24562f9
Author: Yueyang Qiu <ro...@google.com>
AuthorDate: Wed Dec 30 17:11:35 2020 -0800

    [BEAM-11533] Add logic to convert Beam schema to DataCatalog schema to SchemaUtils (#13588)
    
    * Add logic to convert Beam schema to DataCatalog schema to SchemaUtils.java
    
    * Move more internal code to DataCatalogTableProvider
    
    * Add unit tests and set REQUIRED mode for non-null field
    
    * Address comments
---
 .../src/main/resources/beam/suppressions.xml       |   1 +
 .../datacatalog/DataCatalogTableProvider.java      |  61 +++++++-
 .../sql/meta/provider/datacatalog/SchemaUtils.java |  93 ++++++++++++
 .../meta/provider/datacatalog/SchemaUtilsTest.java | 157 +++++++++++++++++++++
 4 files changed, 309 insertions(+), 3 deletions(-)

diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 24c742e..ee03dc3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -98,6 +98,7 @@
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*google.*cloud.*spanner.*FakeBatchTransactionId\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*google.*cloud.*spanner.*FakePartitionFactory\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*extensions.*sql.*datastore.*" />
+  <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*datacatalog.*DataCatalogTableProvider\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*DateTimeUtils\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*ZetaSqlBeamTranslationUtils\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*ZetaSqlDialectSpecTest\.java" />
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
index a2eb4e2..0105ab6 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
@@ -22,15 +22,19 @@ import static java.util.stream.Collectors.toMap;
 import com.google.api.gax.rpc.InvalidArgumentException;
 import com.google.api.gax.rpc.NotFoundException;
 import com.google.api.gax.rpc.PermissionDeniedException;
+import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.datacatalog.v1beta1.DataCatalogClient;
 import com.google.cloud.datacatalog.v1beta1.DataCatalogSettings;
 import com.google.cloud.datacatalog.v1beta1.Entry;
 import com.google.cloud.datacatalog.v1beta1.LookupEntryRequest;
+import com.google.cloud.datacatalog.v1beta1.UpdateEntryRequest;
+import com.google.protobuf.FieldMask;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Stream;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.sql.impl.TableName;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -44,11 +48,17 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.threeten.bp.Duration;
 
 /** Uses DataCatalog to get the source type and schema for a table. */
 public class DataCatalogTableProvider extends FullNameTableProvider implements AutoCloseable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DataCatalogTableProvider.class);
+
   private static final TableFactory PUBSUB_TABLE_FACTORY = new PubsubTableFactory();
   private static final TableFactory GCS_TABLE_FACTORY = new GcsTableFactory();
 
@@ -144,11 +154,38 @@ public class DataCatalogTableProvider extends FullNameTableProvider implements A
 
   private static DataCatalogClient createDataCatalogClient(DataCatalogPipelineOptions options) {
     try {
-      return DataCatalogClient.create(
+      DataCatalogSettings.Builder builder =
           DataCatalogSettings.newBuilder()
               .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
-              .setEndpoint(options.getDataCatalogEndpoint())
-              .build());
+              .setEndpoint(options.getDataCatalogEndpoint());
+
+      // Retry permission denied errors, they are likely due to sync delay.
+      // Limit max retry delay to 1 minute, at that point its probably a legitimate permission error
+      // and we should get back to the user.
+      builder
+          .lookupEntrySettings()
+          .setRetryableCodes(
+              ImmutableSet.of(Code.PERMISSION_DENIED, Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE))
+          .setRetrySettings(
+              builder
+                  .lookupEntrySettings()
+                  .getRetrySettings()
+                  .toBuilder()
+                  .setMaxRetryDelay(Duration.ofMinutes(1L))
+                  .build());
+      builder
+          .updateEntrySettings()
+          .setRetryableCodes(
+              ImmutableSet.of(Code.PERMISSION_DENIED, Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE))
+          .setRetrySettings(
+              builder
+                  .updateEntrySettings()
+                  .getRetrySettings()
+                  .toBuilder()
+                  .setMaxRetryDelay(Duration.ofMinutes(1L))
+                  .build());
+
+      return DataCatalogClient.create(builder.build());
     } catch (IOException e) {
       throw new RuntimeException("Error creating Data Catalog client", e);
     }
@@ -178,6 +215,24 @@ public class DataCatalogTableProvider extends FullNameTableProvider implements A
     return tableBuilder.get().schema(schema).name(tableName).build();
   }
 
+  @Internal
+  public boolean setSchemaIfNotPresent(String resource, Schema schema) {
+    com.google.cloud.datacatalog.v1beta1.Schema dcSchema = SchemaUtils.toDataCatalog(schema);
+    Entry entry =
+        dataCatalog.lookupEntry(LookupEntryRequest.newBuilder().setSqlResource(resource).build());
+    if (entry.getSchema().getColumnsCount() == 0) {
+      dataCatalog.updateEntry(
+          UpdateEntryRequest.newBuilder()
+              .setEntry(entry.toBuilder().setSchema(dcSchema).build())
+              .setUpdateMask(FieldMask.newBuilder().addPaths("schema").build())
+              .build());
+      return true;
+    } else {
+      LOG.info(String.format("Not updating schema for '%s' since it already has one.", resource));
+      return false;
+    }
+  }
+
   @Override
   public void close() {
     dataCatalog.close();
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
index 2b3bcc3..1ee0847 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -32,6 +32,10 @@ import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
 
 @Experimental(Kind.SCHEMAS)
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
 class SchemaUtils {
 
   private static final Map<String, FieldType> FIELD_TYPES =
@@ -98,4 +102,93 @@ class SchemaUtils {
     throw new UnsupportedOperationException(
         "Field type '" + dcFieldType + "' is not supported (field '" + column.getColumn() + "')");
   }
+
+  /** Convert Beam schema to DataCatalog schema. */
+  static com.google.cloud.datacatalog.v1beta1.Schema toDataCatalog(Schema schema) {
+    com.google.cloud.datacatalog.v1beta1.Schema.Builder schemaBuilder =
+        com.google.cloud.datacatalog.v1beta1.Schema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      schemaBuilder.addColumns(fromBeamField(field));
+    }
+    return schemaBuilder.build();
+  }
+
+  private static ColumnSchema fromBeamField(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    if (fieldType.getTypeName().equals(Schema.TypeName.ARRAY)) {
+      if (fieldType.getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array type is not supported in DataCatalog schemas: " + fieldType);
+      } else if (fieldType.getCollectionElementType().getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array element type is not supported in DataCatalog schemas: " + fieldType);
+      } else if (fieldType.getCollectionElementType().getTypeName().equals(Schema.TypeName.ARRAY)) {
+        throw new UnsupportedOperationException(
+            "Array of arrays not supported in DataCatalog schemas: " + fieldType);
+      }
+      ColumnSchema column =
+          fromBeamField(Field.of(field.getName(), fieldType.getCollectionElementType()));
+      if (!column.getMode().equals("REQUIRED")) {
+        // We should have bailed out earlier for any cases that would result in mode being set.
+        throw new AssertionError(
+            "ColumnSchema for collection element type has non-empty mode: " + fieldType);
+      }
+      return column.toBuilder().setMode("REPEATED").build();
+    } else { // struct or primitive type
+      ColumnSchema.Builder colBuilder =
+          ColumnSchema.newBuilder().setType(getDataCatalogType(fieldType));
+
+      if (fieldType.getNullable()) {
+        colBuilder.setMode("NULLABLE");
+      } else {
+        colBuilder.setMode("REQUIRED");
+      }
+
+      // if this is a struct, add the child columns
+      if (fieldType.getTypeName().equals(Schema.TypeName.ROW)) {
+        for (Schema.Field subField : fieldType.getRowSchema().getFields()) {
+          colBuilder.addSubcolumns(fromBeamField(subField));
+        }
+      }
+
+      return colBuilder.setColumn(field.getName()).build();
+    }
+  }
+
+  private static String getDataCatalogType(FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case INT32:
+      case INT64:
+      case BYTES:
+      case DOUBLE:
+      case STRING:
+        return fieldType.getTypeName().name();
+      case BOOLEAN:
+        return "BOOL";
+      case DATETIME:
+        return "TIMESTAMP";
+      case DECIMAL:
+        return "NUMERIC";
+      case LOGICAL_TYPE:
+        Schema.LogicalType logical = fieldType.getLogicalType();
+        if (SqlTypes.TIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "TIME";
+        } else if (SqlTypes.DATE.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATE";
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATETIME";
+        } else {
+          throw new UnsupportedOperationException("Unsupported logical type: " + logical);
+        }
+      case ROW:
+        return "STRUCT";
+      case MAP:
+        return String.format(
+            "MAP<%s,%s>",
+            getDataCatalogType(fieldType.getMapKeyType()),
+            getDataCatalogType(fieldType.getMapValueType()));
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + fieldType);
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java
new file mode 100644
index 0000000..6afc946
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.datacatalog.v1beta1.ColumnSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SchemaUtils}. */
+@RunWith(JUnit4.class)
+public class SchemaUtilsTest {
+
+  private static final Schema TEST_INNER_SCHEMA =
+      Schema.builder().addField("i1", FieldType.INT64).addField("i2", FieldType.STRING).build();
+
+  private static final Schema TEST_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_int32", FieldType.INT32)
+          .addNullableField("f_int64", FieldType.INT64)
+          .addNullableField("f_bytes", FieldType.BYTES)
+          .addNullableField("f_double", FieldType.DOUBLE)
+          .addNullableField("f_string", FieldType.STRING)
+          .addNullableField("f_bool", FieldType.BOOLEAN)
+          .addNullableField("f_ts", FieldType.DATETIME)
+          .addNullableField("f_numeric", FieldType.DECIMAL)
+          .addLogicalTypeField("f_time", SqlTypes.TIME)
+          .addLogicalTypeField("f_date", SqlTypes.DATE)
+          .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
+          .addArrayField("f_array", FieldType.INT64)
+          .addRowField("f_struct", TEST_INNER_SCHEMA)
+          .build();
+
+  private static final com.google.cloud.datacatalog.v1beta1.Schema TEST_DC_SCHEMA =
+      com.google.cloud.datacatalog.v1beta1.Schema.newBuilder()
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_int32")
+                  .setType("INT32")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_int64")
+                  .setType("INT64")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_bytes")
+                  .setType("BYTES")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_double")
+                  .setType("DOUBLE")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_string")
+                  .setType("STRING")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_bool")
+                  .setType("BOOL")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_ts")
+                  .setType("TIMESTAMP")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_numeric")
+                  .setType("NUMERIC")
+                  .setMode("NULLABLE")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_time")
+                  .setType("TIME")
+                  .setMode("REQUIRED")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_date")
+                  .setType("DATE")
+                  .setMode("REQUIRED")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_datetime")
+                  .setType("DATETIME")
+                  .setMode("REQUIRED")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_array")
+                  .setType("INT64")
+                  .setMode("REPEATED")
+                  .build())
+          .addColumns(
+              ColumnSchema.newBuilder()
+                  .setColumn("f_struct")
+                  .setType("STRUCT")
+                  .addSubcolumns(
+                      ColumnSchema.newBuilder()
+                          .setColumn("i1")
+                          .setType("INT64")
+                          .setMode("REQUIRED")
+                          .build())
+                  .addSubcolumns(
+                      ColumnSchema.newBuilder()
+                          .setColumn("i2")
+                          .setType("STRING")
+                          .setMode("REQUIRED")
+                          .build())
+                  .setMode("REQUIRED")
+                  .build())
+          .build();
+
+  @Test
+  public void testFromDataCatalog() {
+    assertEquals(TEST_SCHEMA, SchemaUtils.fromDataCatalog(TEST_DC_SCHEMA));
+  }
+
+  @Test
+  public void testToDataCatalog() {
+    assertEquals(TEST_DC_SCHEMA, SchemaUtils.toDataCatalog(TEST_SCHEMA));
+  }
+}