You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2023/02/16 17:20:08 UTC

[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25392: Read API Source v2

ahmedabu98 commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1107774129


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",

Review Comment:
   `"Estimated bytes this read session will scan when all streams are consumed"` ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());

Review Comment:
   Is there a reason these are split into two logs? Combining may be more readable.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1527,233 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollectionTuple createTupleForDirectRead(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, ReadStream>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            c.output(readStream);
+                          }
+
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, List<ReadStream>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table holding the results.
+                          // The getTargetTable call runs a new instance of the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a desired stream count and
+                          // let the BigQuery storage server pick the number of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              BigQueryHelpers.toTableResourceName(
+                                                  queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) {
+                            readSession = storageClient.createReadSession(request);
+                          }
+                          int streamIndex = 0;
+                          int streamsPerBundle = 10;
+                          List<ReadStream> streamBundle = Lists.newArrayList();
+                          for (ReadStream readStream : readSession.getStreamsList()) {
+                            streamIndex++;
+                            streamBundle.add(readStream);
+                            if (streamIndex % streamsPerBundle == 0) {
+                              c.output(streamBundle);
+                              streamBundle = Lists.newArrayList();
+                            }
+                          }
+

Review Comment:
   ```suggestion
                               if (streamIndex % streamsPerBundle == 0) {
                                 c.output(streamBundle);
                                 streamBundle = Lists.newArrayList();
                               }
                             }
                             if (streamIndex % streamsPerBundle != 0) {
                               c.output(streamBundle);
                             }
   ```
   
   Should also account for the last `streamBundle` that may not have a perfect 10 readStreams.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);

Review Comment:
   `"Estimated bytes each ReadStream will consume"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);
+        streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / bytesPerStream);
+      } else {
+        streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
+      }
+      streamsPerBundle = Math.min(streamCount, streamsPerBundle);
+      LOG.info("streamsPerBundle: '{}'", streamsPerBundle);

Review Comment:
   `"Distributing {} ReadStreams per bundle"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.beam.sdk.io.Source} representing a bundle of Streams in a BigQuery ReadAPI
+ * Session. This Source ONLY supports splitting at the StreamBundle level.
+ *
+ * <p>{@link BigQueryStorageStreamBundleSource} defines a split-point as the starting offset of each
+ * Stream. As a result, the number of valid split points in the Source is equal to the number of
+ * Streams in the StreamBundle and this Source does NOT support sub-Stream splitting.
+ *
+ * <p>Additionally, the underlying {@link org.apache.beam.sdk.io.range.OffsetRangeTracker} and
+ * {@link OffsetBasedSource} operate in the split point space and do NOT directly interact with the
+ * Streams constituting the StreamBundle. Consequently, fractional values used in
+ * `splitAtFraction()` are translated into StreamBundleIndices and the underlying RangeTracker
+ * handles the split operation by checking the validity of the split point. This has the following
+ * implications for the `splitAtFraction()` operation:
+ *
+ * <p>1. Fraction values that point to the "middle" of a Stream will be translated to the
+ * appropriate Stream boundary by the RangeTracker.
+ *
+ * <p>2. Once a Stream is being read from, the RangeTracker will only accept `splitAtFraction()`
+ * calls that point to StreamBundleIndices that are greater than the StreamBundleIndex of the
+ * current Stream
+ *
+ * @param <T> Type of records represented by the source.
+ * @see OffsetBasedSource
+ * @see org.apache.beam.sdk.io.range.OffsetRangeTracker
+ * @see org.apache.beam.sdk.io.BlockBasedSource (semantically similar to {@link
+ *     BigQueryStorageStreamBundleSource})
+ */
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }

Review Comment:
   Would it be useful to implement this method?  The stream bundle could be split down the same way it's done in BigQueryStorageSourceBase.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,18 +205,32 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    }
+    List<ReadStream> streamBundle = Lists.newArrayList();
+    List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
     for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      streamIndex++;
+      streamBundle.add(readStream);
+      if (streamIndex % streamsPerBundle == 0) {
+        sources.add(
+            BigQueryStorageStreamBundleSource.create(
+                readSession, streamBundle, trimmedSchema, parseFn, outputCoder, bqServices, 1L));
+        streamBundle = Lists.newArrayList();
+      }

Review Comment:
   Similarly, add the last `streamBundle` to `sources` if it didn't make the `streamsPerBundle` threshold.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org