You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/12/31 01:12:16 UTC
[beam] branch master updated: [BEAM-11533] Add logic to convert
Beam schema to DataCatalog schema to SchemaUtils (#13588)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48673e8 [BEAM-11533] Add logic to convert Beam schema to DataCatalog schema to SchemaUtils (#13588)
48673e8 is described below
commit 48673e8e2230eef514246686760d8630c24562f9
Author: Yueyang Qiu <ro...@google.com>
AuthorDate: Wed Dec 30 17:11:35 2020 -0800
[BEAM-11533] Add logic to convert Beam schema to DataCatalog schema to SchemaUtils (#13588)
* Add logic to convert Beam schema to DataCatalog schema to SchemaUtils.java
* Move more internal code to DataCatalogTableProvider
* Add unit tests and set REQUIRED mode for non-null field
* Address comments
---
.../src/main/resources/beam/suppressions.xml | 1 +
.../datacatalog/DataCatalogTableProvider.java | 61 +++++++-
.../sql/meta/provider/datacatalog/SchemaUtils.java | 93 ++++++++++++
.../meta/provider/datacatalog/SchemaUtilsTest.java | 157 +++++++++++++++++++++
4 files changed, 309 insertions(+), 3 deletions(-)
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 24c742e..ee03dc3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -98,6 +98,7 @@
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*google.*cloud.*spanner.*FakeBatchTransactionId\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*google.*cloud.*spanner.*FakePartitionFactory\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*extensions.*sql.*datastore.*" />
+ <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*datacatalog.*DataCatalogTableProvider\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*DateTimeUtils\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*ZetaSqlBeamTranslationUtils\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*zetasql.*ZetaSqlDialectSpecTest\.java" />
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
index a2eb4e2..0105ab6 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
@@ -22,15 +22,19 @@ import static java.util.stream.Collectors.toMap;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.PermissionDeniedException;
+import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.datacatalog.v1beta1.DataCatalogClient;
import com.google.cloud.datacatalog.v1beta1.DataCatalogSettings;
import com.google.cloud.datacatalog.v1beta1.Entry;
import com.google.cloud.datacatalog.v1beta1.LookupEntryRequest;
+import com.google.cloud.datacatalog.v1beta1.UpdateEntryRequest;
+import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -44,11 +48,17 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.threeten.bp.Duration;
/** Uses DataCatalog to get the source type and schema for a table. */
public class DataCatalogTableProvider extends FullNameTableProvider implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DataCatalogTableProvider.class);
+
private static final TableFactory PUBSUB_TABLE_FACTORY = new PubsubTableFactory();
private static final TableFactory GCS_TABLE_FACTORY = new GcsTableFactory();
@@ -144,11 +154,38 @@ public class DataCatalogTableProvider extends FullNameTableProvider implements A
private static DataCatalogClient createDataCatalogClient(DataCatalogPipelineOptions options) {
try {
- return DataCatalogClient.create(
+ DataCatalogSettings.Builder builder =
DataCatalogSettings.newBuilder()
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
- .setEndpoint(options.getDataCatalogEndpoint())
- .build());
+ .setEndpoint(options.getDataCatalogEndpoint());
+
+ // Retry permission denied errors, they are likely due to sync delay.
+ // Limit max retry delay to 1 minute, at that point its probably a legitimate permission error
+ // and we should get back to the user.
+ builder
+ .lookupEntrySettings()
+ .setRetryableCodes(
+ ImmutableSet.of(Code.PERMISSION_DENIED, Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE))
+ .setRetrySettings(
+ builder
+ .lookupEntrySettings()
+ .getRetrySettings()
+ .toBuilder()
+ .setMaxRetryDelay(Duration.ofMinutes(1L))
+ .build());
+ builder
+ .updateEntrySettings()
+ .setRetryableCodes(
+ ImmutableSet.of(Code.PERMISSION_DENIED, Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE))
+ .setRetrySettings(
+ builder
+ .updateEntrySettings()
+ .getRetrySettings()
+ .toBuilder()
+ .setMaxRetryDelay(Duration.ofMinutes(1L))
+ .build());
+
+ return DataCatalogClient.create(builder.build());
} catch (IOException e) {
throw new RuntimeException("Error creating Data Catalog client", e);
}
@@ -178,6 +215,24 @@ public class DataCatalogTableProvider extends FullNameTableProvider implements A
return tableBuilder.get().schema(schema).name(tableName).build();
}
+ @Internal
+ public boolean setSchemaIfNotPresent(String resource, Schema schema) {
+ com.google.cloud.datacatalog.v1beta1.Schema dcSchema = SchemaUtils.toDataCatalog(schema);
+ Entry entry =
+ dataCatalog.lookupEntry(LookupEntryRequest.newBuilder().setSqlResource(resource).build());
+ if (entry.getSchema().getColumnsCount() == 0) {
+ dataCatalog.updateEntry(
+ UpdateEntryRequest.newBuilder()
+ .setEntry(entry.toBuilder().setSchema(dcSchema).build())
+ .setUpdateMask(FieldMask.newBuilder().addPaths("schema").build())
+ .build());
+ return true;
+ } else {
+ LOG.info(String.format("Not updating schema for '%s' since it already has one.", resource));
+ return false;
+ }
+ }
+
@Override
public void close() {
dataCatalog.close();
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
index 2b3bcc3..1ee0847 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -32,6 +32,10 @@ import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@Experimental(Kind.SCHEMAS)
+@SuppressWarnings({
+ "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
class SchemaUtils {
private static final Map<String, FieldType> FIELD_TYPES =
@@ -98,4 +102,93 @@ class SchemaUtils {
throw new UnsupportedOperationException(
"Field type '" + dcFieldType + "' is not supported (field '" + column.getColumn() + "')");
}
+
+ /** Convert Beam schema to DataCatalog schema. */
+ static com.google.cloud.datacatalog.v1beta1.Schema toDataCatalog(Schema schema) {
+ com.google.cloud.datacatalog.v1beta1.Schema.Builder schemaBuilder =
+ com.google.cloud.datacatalog.v1beta1.Schema.newBuilder();
+ for (Schema.Field field : schema.getFields()) {
+ schemaBuilder.addColumns(fromBeamField(field));
+ }
+ return schemaBuilder.build();
+ }
+
+ private static ColumnSchema fromBeamField(Schema.Field field) {
+ Schema.FieldType fieldType = field.getType();
+ if (fieldType.getTypeName().equals(Schema.TypeName.ARRAY)) {
+ if (fieldType.getNullable()) {
+ throw new UnsupportedOperationException(
+ "Nullable array type is not supported in DataCatalog schemas: " + fieldType);
+ } else if (fieldType.getCollectionElementType().getNullable()) {
+ throw new UnsupportedOperationException(
+ "Nullable array element type is not supported in DataCatalog schemas: " + fieldType);
+ } else if (fieldType.getCollectionElementType().getTypeName().equals(Schema.TypeName.ARRAY)) {
+ throw new UnsupportedOperationException(
+ "Array of arrays not supported in DataCatalog schemas: " + fieldType);
+ }
+ ColumnSchema column =
+ fromBeamField(Field.of(field.getName(), fieldType.getCollectionElementType()));
+ if (!column.getMode().equals("REQUIRED")) {
+ // We should have bailed out earlier for any cases that would result in mode being set.
+ throw new AssertionError(
+ "ColumnSchema for collection element type has non-empty mode: " + fieldType);
+ }
+ return column.toBuilder().setMode("REPEATED").build();
+ } else { // struct or primitive type
+ ColumnSchema.Builder colBuilder =
+ ColumnSchema.newBuilder().setType(getDataCatalogType(fieldType));
+
+ if (fieldType.getNullable()) {
+ colBuilder.setMode("NULLABLE");
+ } else {
+ colBuilder.setMode("REQUIRED");
+ }
+
+ // if this is a struct, add the child columns
+ if (fieldType.getTypeName().equals(Schema.TypeName.ROW)) {
+ for (Schema.Field subField : fieldType.getRowSchema().getFields()) {
+ colBuilder.addSubcolumns(fromBeamField(subField));
+ }
+ }
+
+ return colBuilder.setColumn(field.getName()).build();
+ }
+ }
+
+ private static String getDataCatalogType(FieldType fieldType) {
+ switch (fieldType.getTypeName()) {
+ case INT32:
+ case INT64:
+ case BYTES:
+ case DOUBLE:
+ case STRING:
+ return fieldType.getTypeName().name();
+ case BOOLEAN:
+ return "BOOL";
+ case DATETIME:
+ return "TIMESTAMP";
+ case DECIMAL:
+ return "NUMERIC";
+ case LOGICAL_TYPE:
+ Schema.LogicalType logical = fieldType.getLogicalType();
+ if (SqlTypes.TIME.getIdentifier().equals(logical.getIdentifier())) {
+ return "TIME";
+ } else if (SqlTypes.DATE.getIdentifier().equals(logical.getIdentifier())) {
+ return "DATE";
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(logical.getIdentifier())) {
+ return "DATETIME";
+ } else {
+ throw new UnsupportedOperationException("Unsupported logical type: " + logical);
+ }
+ case ROW:
+ return "STRUCT";
+ case MAP:
+ return String.format(
+ "MAP<%s,%s>",
+ getDataCatalogType(fieldType.getMapKeyType()),
+ getDataCatalogType(fieldType.getMapValueType()));
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + fieldType);
+ }
+ }
}
diff --git a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java
new file mode 100644
index 0000000..6afc946
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtilsTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.datacatalog.v1beta1.ColumnSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SchemaUtils}. */
+@RunWith(JUnit4.class)
+public class SchemaUtilsTest {
+
+ private static final Schema TEST_INNER_SCHEMA =
+ Schema.builder().addField("i1", FieldType.INT64).addField("i2", FieldType.STRING).build();
+
+ private static final Schema TEST_SCHEMA =
+ Schema.builder()
+ .addNullableField("f_int32", FieldType.INT32)
+ .addNullableField("f_int64", FieldType.INT64)
+ .addNullableField("f_bytes", FieldType.BYTES)
+ .addNullableField("f_double", FieldType.DOUBLE)
+ .addNullableField("f_string", FieldType.STRING)
+ .addNullableField("f_bool", FieldType.BOOLEAN)
+ .addNullableField("f_ts", FieldType.DATETIME)
+ .addNullableField("f_numeric", FieldType.DECIMAL)
+ .addLogicalTypeField("f_time", SqlTypes.TIME)
+ .addLogicalTypeField("f_date", SqlTypes.DATE)
+ .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
+ .addArrayField("f_array", FieldType.INT64)
+ .addRowField("f_struct", TEST_INNER_SCHEMA)
+ .build();
+
+ private static final com.google.cloud.datacatalog.v1beta1.Schema TEST_DC_SCHEMA =
+ com.google.cloud.datacatalog.v1beta1.Schema.newBuilder()
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_int32")
+ .setType("INT32")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_int64")
+ .setType("INT64")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_bytes")
+ .setType("BYTES")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_double")
+ .setType("DOUBLE")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_string")
+ .setType("STRING")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_bool")
+ .setType("BOOL")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_ts")
+ .setType("TIMESTAMP")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_numeric")
+ .setType("NUMERIC")
+ .setMode("NULLABLE")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_time")
+ .setType("TIME")
+ .setMode("REQUIRED")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_date")
+ .setType("DATE")
+ .setMode("REQUIRED")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_datetime")
+ .setType("DATETIME")
+ .setMode("REQUIRED")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_array")
+ .setType("INT64")
+ .setMode("REPEATED")
+ .build())
+ .addColumns(
+ ColumnSchema.newBuilder()
+ .setColumn("f_struct")
+ .setType("STRUCT")
+ .addSubcolumns(
+ ColumnSchema.newBuilder()
+ .setColumn("i1")
+ .setType("INT64")
+ .setMode("REQUIRED")
+ .build())
+ .addSubcolumns(
+ ColumnSchema.newBuilder()
+ .setColumn("i2")
+ .setType("STRING")
+ .setMode("REQUIRED")
+ .build())
+ .setMode("REQUIRED")
+ .build())
+ .build();
+
+ @Test
+ public void testFromDataCatalog() {
+ assertEquals(TEST_SCHEMA, SchemaUtils.fromDataCatalog(TEST_DC_SCHEMA));
+ }
+
+ @Test
+ public void testToDataCatalog() {
+ assertEquals(TEST_DC_SCHEMA, SchemaUtils.toDataCatalog(TEST_SCHEMA));
+ }
+}