You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "reuvenlax (via GitHub)" <gi...@apache.org> on 2023/05/19 20:21:59 UTC

[GitHub] [beam] reuvenlax opened a new pull request, #26794: #26789 Fix auto schema update when schema order has changed.

reuvenlax opened a new pull request, #26794:
URL: https://github.com/apache/beam/pull/26794

   R: @yirutang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on code in PR #26794:
URL: https://github.com/apache/beam/pull/26794#discussion_r1199640894


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/** Helper utilities for handling schema-update responses. */
+public class TableSchemaUpdateUtils {
+  /*
+  Given an original schema and an updated schema, return a schema that should be used to process future records.
+  This function returns:
+      - If the new schema is not compatible (e.g. missing fields), then it will return Optional.empty().
+      - The returned schema will always contain the old schema as a prefix. This ensures that if any of the old
+       fields are reordered in the new schema, we maintain the old order.
+   */
+  public static Optional<TableSchema> getUpdatedSchema(
+      TableSchema oldSchema, TableSchema newSchema) {
+    Optional<List<TableFieldSchema>> updatedFields =
+        getUpdatedSchema(oldSchema.getFieldsList(), newSchema.getFieldsList());
+    return updatedFields.map(
+        tableFieldSchemas -> TableSchema.newBuilder().addAllFields(tableFieldSchemas).build());
+  }
+
+  private static Optional<List<TableFieldSchema>> getUpdatedSchema(
+      @Nullable List<TableFieldSchema> oldSchema, @Nullable List<TableFieldSchema> newSchema) {
+    if (newSchema == null) {
+      return Optional.empty();
+    }
+    if (oldSchema == null) {
+      return Optional.of(newSchema);
+    }
+
+    Map<String, TableFieldSchema> newSchemaMap =
+        newSchema.stream().collect(Collectors.toMap(TableFieldSchema::getName, x -> x));
+    Set<String> fieldNamesPopulated = Sets.newHashSet();
+    List<TableFieldSchema> updatedSchema = Lists.newArrayList();
+    for (TableFieldSchema tableFieldSchema : oldSchema) {
+      @Nullable TableFieldSchema newTableFieldSchema = newSchemaMap.get(tableFieldSchema.getName());
+      if (newTableFieldSchema == null) {
+        // We don't support deleting fields!
+        return Optional.empty();
+      }
+      TableFieldSchema.Builder updatedTableFieldSchema = newTableFieldSchema.toBuilder();
+      updatedTableFieldSchema.clearFields();
+      if (tableFieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        Optional<List<TableFieldSchema>> updatedTableFields =
+            getUpdatedSchema(tableFieldSchema.getFieldsList(), newTableFieldSchema.getFieldsList());
+        if (!updatedTableFields.isPresent()) {
+          return Optional.empty();

Review Comment:
   I don't think so - it means that the nested row was incompatible (essentially a field was deleted). In theory we should never hit this of course, since BigQuery should never allow the table to be updated to an incompatible schema.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1985,9 +1985,13 @@ public void updateTableSchemaTest(boolean useSet) throws Exception {
         new TableSchema()
             .setFields(
                 ImmutableList.of(
-                    new TableFieldSchema().setName("name").setType("STRING"),
                     new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("name").setType("STRING"),
                     new TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
+

Review Comment:
   done in other unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yirutang commented on a diff in pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "yirutang (via GitHub)" <gi...@apache.org>.
yirutang commented on code in PR #26794:
URL: https://github.com/apache/beam/pull/26794#discussion_r1199636901


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtilsTest.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for the {@link TableSchemaUpdateUtils class}. */
+@RunWith(JUnit4.class)
+public class TableSchemaUpdateUtilsTest {
+  @Test
+  public void testSchemaUpdate() {
+    TableSchema baseSchema =
+        TableSchema.newBuilder()
+            .addFields(
+                TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING))
+            .build();
+    TableSchema schema =
+        TableSchema.newBuilder()
+            .addFields(
+                TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder()

Review Comment:
   more than one layer nested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1555940714

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1555940884

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax merged pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax merged PR #26794:
URL: https://github.com/apache/beam/pull/26794


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1555940776

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1556103317

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yirutang commented on a diff in pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "yirutang (via GitHub)" <gi...@apache.org>.
yirutang commented on code in PR #26794:
URL: https://github.com/apache/beam/pull/26794#discussion_r1199496151


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/** Helper utilities for handling schema-update responses. */
+public class TableSchemaUpdateUtils {
+  /*
+  Given an original schema and an updated schema, return a schema that should be used to process future records.
+  This function returns:
+      - If the new schema is not compatible (e.g. missing fields), then it will return Optional.empty().
+      - The returned schema will always contain the old schema as a prefix. This ensures that if any of the old
+       fields are reordered in the new schema, we maintain the old order.
+   */
+  public static Optional<TableSchema> getUpdatedSchema(
+      TableSchema oldSchema, TableSchema newSchema) {
+    Optional<List<TableFieldSchema>> updatedFields =
+        getUpdatedSchema(oldSchema.getFieldsList(), newSchema.getFieldsList());
+    return updatedFields.map(
+        tableFieldSchemas -> TableSchema.newBuilder().addAllFields(tableFieldSchemas).build());
+  }
+
+  private static Optional<List<TableFieldSchema>> getUpdatedSchema(
+      @Nullable List<TableFieldSchema> oldSchema, @Nullable List<TableFieldSchema> newSchema) {
+    if (newSchema == null) {
+      return Optional.empty();
+    }
+    if (oldSchema == null) {
+      return Optional.of(newSchema);
+    }
+
+    Map<String, TableFieldSchema> newSchemaMap =
+        newSchema.stream().collect(Collectors.toMap(TableFieldSchema::getName, x -> x));
+    Set<String> fieldNamesPopulated = Sets.newHashSet();
+    List<TableFieldSchema> updatedSchema = Lists.newArrayList();
+    for (TableFieldSchema tableFieldSchema : oldSchema) {
+      @Nullable TableFieldSchema newTableFieldSchema = newSchemaMap.get(tableFieldSchema.getName());
+      if (newTableFieldSchema == null) {
+        // We don't support deleting fields!
+        return Optional.empty();
+      }
+      TableFieldSchema.Builder updatedTableFieldSchema = newTableFieldSchema.toBuilder();
+      updatedTableFieldSchema.clearFields();
+      if (tableFieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        Optional<List<TableFieldSchema>> updatedTableFields =
+            getUpdatedSchema(tableFieldSchema.getFieldsList(), newTableFieldSchema.getFieldsList());
+        if (!updatedTableFields.isPresent()) {
+          return Optional.empty();

Review Comment:
   is this return too aggresive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1555249236

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @Abacn for label java.
   R: @manavgarg for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yirutang commented on a diff in pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "yirutang (via GitHub)" <gi...@apache.org>.
yirutang commented on code in PR #26794:
URL: https://github.com/apache/beam/pull/26794#discussion_r1199404720


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1985,9 +1985,13 @@ public void updateTableSchemaTest(boolean useSet) throws Exception {
         new TableSchema()
             .setFields(
                 ImmutableList.of(
-                    new TableFieldSchema().setName("name").setType("STRING"),
                     new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("name").setType("STRING"),
                     new TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
+

Review Comment:
   test nested case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on code in PR #26794:
URL: https://github.com/apache/beam/pull/26794#discussion_r1199641534


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtilsTest.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for the {@link TableSchemaUpdateUtils class}. */
+@RunWith(JUnit4.class)
+public class TableSchemaUpdateUtilsTest {
+  @Test
+  public void testSchemaUpdate() {
+    TableSchema baseSchema =
+        TableSchema.newBuilder()
+            .addFields(
+                TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING))
+            .build();
+    TableSchema schema =
+        TableSchema.newBuilder()
+            .addFields(
+                TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING))
+            .addFields(
+                TableFieldSchema.newBuilder()

Review Comment:
   added another layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26794: #26789 Fix auto schema update when schema order has changed.

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26794:
URL: https://github.com/apache/beam/pull/26794#issuecomment-1556046475

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org