You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "vachan-shetty (via GitHub)" <gi...@apache.org> on 2023/02/08 23:02:26 UTC

[GitHub] [beam] vachan-shetty opened a new pull request, #25392: Read API Source v2

vachan-shetty opened a new pull request, #25392:
URL: https://github.com/apache/beam/pull/25392

   Adding the new Read API Source which supports bundling of Streams within Read Session. Customers can enable/disable this feature using the `setEnableBundling` BigQuery pipeline option (the option flag currently defaults to false).
   
   The design doc for the changes can be found [here]( https://docs.google.com/document/d/1rMv5lH5uRHjCA9v_xYdZ_fY1XweAfhzmq7oh_0w9d8c/edit?usp=sharing). This PR addresses #24260.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25392: Read API Source v2

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109125140


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -160,8 +161,9 @@ public long getEstimatedSizeBytes(PipelineOptions options) {
   @Override
   public List<? extends OffsetBasedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) {
-    // A stream source can't be split without reading from it due to server-side liquid sharding.
-    // TODO: Implement dynamic work rebalancing.
+    // This method is only called for initial splits. Since this class will always be a child source
+    // of BigQueryStorageSourceBase, all splits here will be handled by `splitAtFraction()`. As a
+    // result, this is a no-op.

Review Comment:
   FYI because it was mentioned previously as `TODO: Implement dynamic work rebalancing`, splitAtFraction() _is_ dynamic work rebalancing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey merged pull request #25392: Read API Source v2

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey merged PR #25392:
URL: https://github.com/apache/beam/pull/25392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kmjung commented on a diff in pull request #25392: Read API Source v2

Posted by "kmjung (via GitHub)" <gi...@apache.org>.
kmjung commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1101005715


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -150,4 +150,12 @@
   Long getStorageWriteApiMaxRequestSize();
 
   void setStorageWriteApiMaxRequestSize(Long value);
+
+  @Description(
+      "If set, BigQueryIO.Read will use the StreamBundle based"
+          + "implementation of the Read API Source")
+  @Default.Boolean(false)
+  Boolean getEnableBundling();

Review Comment:
   This should be marked as experimental, I think?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Given that we're not including the schema in the output PCollection here and are instead passing it as a separate side input, is there any reason to bundle in this code path?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,19 +205,34 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
-    for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    } else {

Review Comment:
   Nit: since you unconditionally return in the 'if' block, you can remove the 'else' block.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Also, given how much code duplication already exists in this and other non-BoundedSource expansion paths, can we find a way to consolidate? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    // The underlying OffsetBasedSource (and RangeTracker) operate only on the StreamBundle and NOT
+    // the Streams that constitute the StreamBundle. More specifically, the offsets in the
+    // OffsetBasedSource are indices for the StreamBundle List.
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }
+
+  @Override
+  public long getMaxEndOffset(PipelineOptions options) throws Exception {
+    return this.streamBundle.size();
+  }
+
+  @Override
+  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+    List<ReadStream> newStreamBundle = streamBundle.subList((int) start, (int) end);
+    return fromExisting(newStreamBundle);
+  }
+
+  @Override
+  public BigQueryStorageStreamBundleReader<T> createReader(PipelineOptions options)
+      throws IOException {
+    return new BigQueryStorageStreamBundleReader<>(this, options.as(BigQueryOptions.class));
+  }
+
+  public static class BigQueryStorageStreamBundleReader<T> extends OffsetBasedReader<T> {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class);
+
+    private final BigQueryStorageReader reader;
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final StorageClient storageClient;
+    private final TableSchema tableSchema;
+
+    private BigQueryStorageStreamBundleSource<T> source;
+    private @Nullable BigQueryServerStream<ReadRowsResponse> responseStream = null;
+    private @Nullable Iterator<ReadRowsResponse> responseIterator = null;
+    private @Nullable T current = null;
+    private int currentStreamBundleIndex;
+    private long currentStreamOffset;
+
+    // Values used for progress reporting.
+    private double fractionOfStreamBundleConsumed;
+
+    private double progressAtResponseStart;
+    private double progressAtResponseEnd;
+    private long rowsConsumedFromCurrentResponse;
+    private long totalRowsInCurrentResponse;
+
+    private @Nullable TableReference tableReference;
+    private @Nullable ServiceCallMetric serviceCallMetric;
+
+    private BigQueryStorageStreamBundleReader(
+        BigQueryStorageStreamBundleSource<T> source, BigQueryOptions options) throws IOException {
+      super(source);
+      this.source = source;
+      this.reader = BigQueryStorageReaderFactory.getReader(source.readSession);
+      this.parseFn = source.parseFn;
+      this.storageClient = source.bqServices.getStorageClient(options);
+      this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class);
+      this.currentStreamBundleIndex = 0;
+      this.fractionOfStreamBundleConsumed = 0d;
+      this.progressAtResponseStart = 0d;
+      this.progressAtResponseEnd = 0d;
+      this.rowsConsumedFromCurrentResponse = 0L;
+      this.totalRowsInCurrentResponse = 0L;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return currentStreamBundleIndex;
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      // The start of every Stream within a StreamBundle is being defined as a split point. This
+      // implies that we cannot split below the granularity of a Stream
+      if (currentStreamOffset == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public boolean startImpl() throws IOException {
+      return readNextStream();
+    }
+
+    @Override
+    public boolean advanceImpl() throws IOException {
+      Preconditions.checkStateNotNull(responseIterator);
+      currentStreamOffset += totalRowsInCurrentResponse;
+      return readNextRecord();
+    }
+
+    private synchronized boolean readNextStream() throws IOException {

Review Comment:
   Making this method synchronized means that we'll perform the entirety of the `ReadRows` call -- including any backoff / retry due to concurrency limits -- with the object lock held. Did we not find a way to avoid this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1102205268


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1102191526


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    // The underlying OffsetBasedSource (and RangeTracker) operate only on the StreamBundle and NOT
+    // the Streams that constitute the StreamBundle. More specifically, the offsets in the
+    // OffsetBasedSource are indices for the StreamBundle List.
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }
+
+  @Override
+  public long getMaxEndOffset(PipelineOptions options) throws Exception {
+    return this.streamBundle.size();
+  }
+
+  @Override
+  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+    List<ReadStream> newStreamBundle = streamBundle.subList((int) start, (int) end);
+    return fromExisting(newStreamBundle);
+  }
+
+  @Override
+  public BigQueryStorageStreamBundleReader<T> createReader(PipelineOptions options)
+      throws IOException {
+    return new BigQueryStorageStreamBundleReader<>(this, options.as(BigQueryOptions.class));
+  }
+
+  public static class BigQueryStorageStreamBundleReader<T> extends OffsetBasedReader<T> {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class);
+
+    private final BigQueryStorageReader reader;
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final StorageClient storageClient;
+    private final TableSchema tableSchema;
+
+    private BigQueryStorageStreamBundleSource<T> source;
+    private @Nullable BigQueryServerStream<ReadRowsResponse> responseStream = null;
+    private @Nullable Iterator<ReadRowsResponse> responseIterator = null;
+    private @Nullable T current = null;
+    private int currentStreamBundleIndex;
+    private long currentStreamOffset;
+
+    // Values used for progress reporting.
+    private double fractionOfStreamBundleConsumed;
+
+    private double progressAtResponseStart;
+    private double progressAtResponseEnd;
+    private long rowsConsumedFromCurrentResponse;
+    private long totalRowsInCurrentResponse;
+
+    private @Nullable TableReference tableReference;
+    private @Nullable ServiceCallMetric serviceCallMetric;
+
+    private BigQueryStorageStreamBundleReader(
+        BigQueryStorageStreamBundleSource<T> source, BigQueryOptions options) throws IOException {
+      super(source);
+      this.source = source;
+      this.reader = BigQueryStorageReaderFactory.getReader(source.readSession);
+      this.parseFn = source.parseFn;
+      this.storageClient = source.bqServices.getStorageClient(options);
+      this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class);
+      this.currentStreamBundleIndex = 0;
+      this.fractionOfStreamBundleConsumed = 0d;
+      this.progressAtResponseStart = 0d;
+      this.progressAtResponseEnd = 0d;
+      this.rowsConsumedFromCurrentResponse = 0L;
+      this.totalRowsInCurrentResponse = 0L;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return currentStreamBundleIndex;
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      // The start of every Stream within a StreamBundle is being defined as a split point. This
+      // implies that we cannot split below the granularity of a Stream
+      if (currentStreamOffset == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public boolean startImpl() throws IOException {
+      return readNextStream();
+    }
+
+    @Override
+    public boolean advanceImpl() throws IOException {
+      Preconditions.checkStateNotNull(responseIterator);
+      currentStreamOffset += totalRowsInCurrentResponse;
+      return readNextRecord();
+    }
+
+    private synchronized boolean readNextStream() throws IOException {

Review Comment:
   Removed the `synchronized` from `readNextStream()`. Also added a Javadoc explaining how this Source handles possible race conditions.
   
   In summary: the underlying `OffsetBasedSource` and `OffsetRangeTracker` only operate in the split-point space (which in this case are StreamBundle indices). As a result, the `RangeTracker` does NOT interact with the underlying Stream directly. This in turn allows us to rely on the synchronization guarantees provided by `RangeTracker` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1423389995

   R: @johnjcasey, @lukecwik, @kmjung 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #25392: Read API Source v2

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1440195809

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25392: Read API Source v2

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1423475139

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1102191267


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Trying to address this in a follow-up commit for now. Due to the urgency of the customer issue I might also try to make clean-up changes in a follow-up PR.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,19 +205,34 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
-    for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    } else {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25392: Read API Source v2

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1440635907

   Test suites aren't statusing, but they've all completed successfully except for https://ci-beam.apache.org/job/beam_PreCommit_Java_GCP_IO_Direct_Commit/1778/ (Java_GCP_IO_Direct) which is still running and seems stuck at the start


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25392: Read API Source v2

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1433888709

   Failing test `org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOWriteTest.testTriggeredFileLoadsWithTempTablesToExistingNullSchemaTable[1]` in Java_GCP_IO_Direct is a known issue #25207. Can be ignored


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1430597008

   Retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109110165


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,18 +205,32 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    }
+    List<ReadStream> streamBundle = Lists.newArrayList();
+    List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
     for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      streamIndex++;
+      streamBundle.add(readStream);
+      if (streamIndex % streamsPerBundle == 0) {
+        sources.add(
+            BigQueryStorageStreamBundleSource.create(
+                readSession, streamBundle, trimmedSchema, parseFn, outputCoder, bqServices, 1L));
+        streamBundle = Lists.newArrayList();
+      }

Review Comment:
   Done.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.beam.sdk.io.Source} representing a bundle of Streams in a BigQuery ReadAPI
+ * Session. This Source ONLY supports splitting at the StreamBundle level.
+ *
+ * <p>{@link BigQueryStorageStreamBundleSource} defines a split-point as the starting offset of each
+ * Stream. As a result, the number of valid split points in the Source is equal to the number of
+ * Streams in the StreamBundle and this Source does NOT support sub-Stream splitting.
+ *
+ * <p>Additionally, the underlying {@link org.apache.beam.sdk.io.range.OffsetRangeTracker} and
+ * {@link OffsetBasedSource} operate in the split point space and do NOT directly interact with the
+ * Streams constituting the StreamBundle. Consequently, fractional values used in
+ * `splitAtFraction()` are translated into StreamBundleIndices and the underlying RangeTracker
+ * handles the split operation by checking the validity of the split point. This has the following
+ * implications for the `splitAtFraction()` operation:
+ *
+ * <p>1. Fraction values that point to the "middle" of a Stream will be translated to the
+ * appropriate Stream boundary by the RangeTracker.
+ *
+ * <p>2. Once a Stream is being read from, the RangeTracker will only accept `splitAtFraction()`
+ * calls that point to StreamBundleIndices that are greater than the StreamBundleIndex of the
+ * current Stream
+ *
+ * @param <T> Type of records represented by the source.
+ * @see OffsetBasedSource
+ * @see org.apache.beam.sdk.io.range.OffsetRangeTracker
+ * @see org.apache.beam.sdk.io.BlockBasedSource (semantically similar to {@link
+ *     BigQueryStorageStreamBundleSource})
+ */
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }

Review Comment:
   As discussed offline, the `split()` method is only used for initial splits. Subsequent splits are handled using `splitAtFraction()`. I've updated the comments to reflect this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109109843


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());

Review Comment:
   I did this because it made it easier when I was debugging the changes. Reverted now.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25392: Read API Source v2

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1440704428

   That succeeded - https://ci-beam.apache.org/job/beam_PreCommit_Java_GCP_IO_Direct_Commit/1778/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109110301


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);
+        streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / bytesPerStream);
+      } else {
+        streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
+      }
+      streamsPerBundle = Math.min(streamCount, streamsPerBundle);
+      LOG.info("streamsPerBundle: '{}'", streamsPerBundle);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1428489199

   R: @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1432182120

   Retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25392: Read API Source v2

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1107774129


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",

Review Comment:
   `"Estimated bytes this read session will scan when all streams are consumed"` ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());

Review Comment:
   Is there a reason these are split into two logs? Combining may be more readable.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1527,233 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollectionTuple createTupleForDirectRead(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, ReadStream>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            c.output(readStream);
+                          }
+
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, List<ReadStream>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+                          int streamIndex = 0;
+                          int streamsPerBundle = 10;
+                          List<ReadStream> streamBundle = Lists.newArrayList();
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            streamIndex++;
+                            streamBundle.add(readStream);
+                            if (streamIndex % streamsPerBundle == 0) {
+                              c.output(streamBundle);
+                              streamBundle = Lists.newArrayList();
+                            }
+                          }
+

Review Comment:
   ```suggestion
                               if (streamIndex % streamsPerBundle == 0) {
                                 c.output(streamBundle);
                                 streamBundle = Lists.newArrayList();
                               }
                             }
                             if (streamIndex % streamsPerBundle != 0) {
                               c.output(streamBundle);
                             }
   ```
   
   Should also account for the last `streamBundle` that may not have a perfect 10 readStreams.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);

Review Comment:
   `"Estimated bytes each ReadStream will consume"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);
+        streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / bytesPerStream);
+      } else {
+        streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
+      }
+      streamsPerBundle = Math.min(streamCount, streamsPerBundle);
+      LOG.info("streamsPerBundle: '{}'", streamsPerBundle);

Review Comment:
   `"Distributing {} ReadStreams per bundle"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.beam.sdk.io.Source} representing a bundle of Streams in a BigQuery ReadAPI
+ * Session. This Source ONLY supports splitting at the StreamBundle level.
+ *
+ * <p>{@link BigQueryStorageStreamBundleSource} defines a split-point as the starting offset of each
+ * Stream. As a result, the number of valid split points in the Source is equal to the number of
+ * Streams in the StreamBundle and this Source does NOT support sub-Stream splitting.
+ *
+ * <p>Additionally, the underlying {@link org.apache.beam.sdk.io.range.OffsetRangeTracker} and
+ * {@link OffsetBasedSource} operate in the split point space and do NOT directly interact with the
+ * Streams constituting the StreamBundle. Consequently, fractional values used in
+ * `splitAtFraction()` are translated into StreamBundleIndices and the underlying RangeTracker
+ * handles the split operation by checking the validity of the split point. This has the following
+ * implications for the `splitAtFraction()` operation:
+ *
+ * <p>1. Fraction values that point to the "middle" of a Stream will be translated to the
+ * appropriate Stream boundary by the RangeTracker.
+ *
+ * <p>2. Once a Stream is being read from, the RangeTracker will only accept `splitAtFraction()`
+ * calls that point to StreamBundleIndices that are greater than the StreamBundleIndex of the
+ * current Stream
+ *
+ * @param <T> Type of records represented by the source.
+ * @see OffsetBasedSource
+ * @see org.apache.beam.sdk.io.range.OffsetRangeTracker
+ * @see org.apache.beam.sdk.io.BlockBasedSource (semantically similar to {@link
+ *     BigQueryStorageStreamBundleSource})
+ */
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }

Review Comment:
   Would it be useful to implement this method?  The stream bundle could be split down the same way it's done in BigQueryStorageSourceBase.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,18 +205,32 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    }
+    List<ReadStream> streamBundle = Lists.newArrayList();
+    List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
     for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      streamIndex++;
+      streamBundle.add(readStream);
+      if (streamIndex % streamsPerBundle == 0) {
+        sources.add(
+            BigQueryStorageStreamBundleSource.create(
+                readSession, streamBundle, trimmedSchema, parseFn, outputCoder, bqServices, 1L));
+        streamBundle = Lists.newArrayList();
+      }

Review Comment:
   Similarly, add the last `streamBundle` to `sources` if it didn't make the `streamsPerBundle` threshold.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] vachan-shetty commented on a diff in pull request #25392: Read API Source v2

Posted by "vachan-shetty (via GitHub)" <gi...@apache.org>.
vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109110017


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);

Review Comment:
   Done.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1527,233 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollectionTuple createTupleForDirectRead(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, ReadStream>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            c.output(readStream);
+                          }
+
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, List<ReadStream>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+                          int streamIndex = 0;
+                          int streamsPerBundle = 10;
+                          List<ReadStream> streamBundle = Lists.newArrayList();
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            streamIndex++;
+                            streamBundle.add(readStream);
+                            if (streamIndex % streamsPerBundle == 0) {
+                              c.output(streamBundle);
+                              streamBundle = Lists.newArrayList();
+                            }
+                          }
+

Review Comment:
   Nice catch! Added your suggested change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25392: Read API Source v2

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25392:
URL: https://github.com/apache/beam/pull/25392#issuecomment-1440636066

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org