You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/09 22:18:12 UTC

[GitHub] [beam] vachan-shetty opened a new pull request, #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing.

vachan-shetty opened a new pull request, #24074:
URL: https://github.com/apache/beam/pull/24074

   This change also adds a BigQuery option that allows users to specify the initial number of Streams in a BQ Read API Session.
   
   Customers using BEAM with BigQuery Read API experience errors (like `Root cause: The worker lost contact with the service`) whenever Dataflow decides to aggressively rebalance work items (i.e. calling `splitAtFraction()`). This change provides customers with an emergency mechanism that will allow pipelines experiencing a high volume of such errors to run to completion. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #24074:
URL: https://github.com/apache/beam/pull/24074#discussion_r1021962550


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadWithoutDynamicWorkflowRebalancingTest.java:
##########
@@ -0,0 +1,1526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static java.util.Arrays.asList;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.api.services.bigquery.model.Streamingbuffer;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
+import com.google.cloud.bigquery.storage.v1.ArrowSchema;
+import com.google.cloud.bigquery.storage.v1.AvroRows;
+import com.google.cloud.bigquery.storage.v1.AvroSchema;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
+import com.google.cloud.bigquery.storage.v1.StreamStats;
+import com.google.cloud.bigquery.storage.v1.StreamStats.Progress;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.Text;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/**
+ * Tests for {@link BigQueryIO#readTableRows() using {@link Method#DIRECT_READ}} with dynamic
+ * workflow rebalancing DISABLED.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageReadWithoutDynamicWorkflowRebalancingTest {
+
+  private transient PipelineOptions options;
+  private final transient TemporaryFolder testFolder = new TemporaryFolder();
+  private transient TestPipeline p;
+  private BufferAllocator allocator;
+
+  @Rule
+  public final transient TestRule folderThenPipeline =
+      new TestRule() {
+        @Override
+        public Statement apply(Statement base, Description description) {
+          // We need to set up the temporary folder, and then set up the TestPipeline based on the
+          // chosen folder. Unfortunately, since rule evaluation order is unspecified and unrelated
+          // to field order, and is separate from construction, that requires manually creating this
+          // TestRule.
+          Statement withPipeline =
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  options = TestPipeline.testingPipelineOptions();
+                  options.as(BigQueryOptions.class).setProject("project-id");
+                  if (description.getAnnotations().stream()
+                      .anyMatch(a -> a.annotationType().equals(ProjectOverride.class))) {
+                    options.as(BigQueryOptions.class).setBigQueryProject("bigquery-project-id");
+                  }
+                  options
+                      .as(BigQueryOptions.class)
+                      .setTempLocation(testFolder.getRoot().getAbsolutePath());
+                  options.as(BigQueryOptions.class).setDynamicWorkflowRebalancingEnabled(false);
+                  p = TestPipeline.fromOptions(options);
+                  p.apply(base, description).evaluate();
+                }
+              };
+          return testFolder.apply(withPipeline, description);
+        }
+      };
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+
+  @Before
+  public void setUp() throws Exception {
+    FakeDatasetService.setUp();
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void teardown() {
+    allocator.close();
+  }
+
+  @Test
+  public void testBuildTableBasedSource() {

Review Comment:
   Seems like we are just copying all tests to test file (note necessarily tests related to dynamic work rebalancing) ? Can we cleanup these test files to share code more so that we do not copy around a lot of code ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1332057128

   Reminder, please take a look at this pr: @robertwb @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1323570987

   Reminder, please take a look at this pr: @robertwb @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1337573263

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1309509295

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1337241519

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1324400959

   @vachan-shetty happy to merge after the comment is addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1426758466

   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] closed pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API
URL: https://github.com/apache/beam/pull/24074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24074: Add BigQueryOptions to disable Dynamic Work Rebalancing in the Read API

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24074:
URL: https://github.com/apache/beam/pull/24074#issuecomment-1416742865

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org