You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "kmjung (via GitHub)" <gi...@apache.org> on 2023/02/09 05:52:02 UTC

[GitHub] [beam] kmjung commented on a diff in pull request #25392: Read API Source v2

kmjung commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1101005715


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -150,4 +150,12 @@
   Long getStorageWriteApiMaxRequestSize();
 
   void setStorageWriteApiMaxRequestSize(Long value);
+
+  @Description(
+      "If set, BigQueryIO.Read will use the StreamBundle based"
+          + "implementation of the Read API Source")
+  @Default.Boolean(false)
+  Boolean getEnableBundling();

Review Comment:
   This should be marked as experimental, I think?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Given that we're not including the schema in the output PCollection here and are instead passing it as a separate side input, is there any reason to bundle in this code path?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,19 +205,34 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
-    for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    } else {

Review Comment:
   Nit: since you unconditionally return in the 'if' block, you can remove the 'else' block.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Also, given how much code duplication already exists in this and other non-BoundedSource expansion paths, can we find a way to consolidate? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    // The underlying OffsetBasedSource (and RangeTracker) operate only on the StreamBundle and NOT
+    // the Streams that constitute the StreamBundle. More specifically, the offsets in the
+    // OffsetBasedSource are indices for the StreamBundle List.
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }
+
+  @Override
+  public long getMaxEndOffset(PipelineOptions options) throws Exception {
+    return this.streamBundle.size();
+  }
+
+  @Override
+  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+    List<ReadStream> newStreamBundle = streamBundle.subList((int) start, (int) end);
+    return fromExisting(newStreamBundle);
+  }
+
+  @Override
+  public BigQueryStorageStreamBundleReader<T> createReader(PipelineOptions options)
+      throws IOException {
+    return new BigQueryStorageStreamBundleReader<>(this, options.as(BigQueryOptions.class));
+  }
+
+  public static class BigQueryStorageStreamBundleReader<T> extends OffsetBasedReader<T> {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class);
+
+    private final BigQueryStorageReader reader;
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final StorageClient storageClient;
+    private final TableSchema tableSchema;
+
+    private BigQueryStorageStreamBundleSource<T> source;
+    private @Nullable BigQueryServerStream<ReadRowsResponse> responseStream = null;
+    private @Nullable Iterator<ReadRowsResponse> responseIterator = null;
+    private @Nullable T current = null;
+    private int currentStreamBundleIndex;
+    private long currentStreamOffset;
+
+    // Values used for progress reporting.
+    private double fractionOfStreamBundleConsumed;
+
+    private double progressAtResponseStart;
+    private double progressAtResponseEnd;
+    private long rowsConsumedFromCurrentResponse;
+    private long totalRowsInCurrentResponse;
+
+    private @Nullable TableReference tableReference;
+    private @Nullable ServiceCallMetric serviceCallMetric;
+
+    private BigQueryStorageStreamBundleReader(
+        BigQueryStorageStreamBundleSource<T> source, BigQueryOptions options) throws IOException {
+      super(source);
+      this.source = source;
+      this.reader = BigQueryStorageReaderFactory.getReader(source.readSession);
+      this.parseFn = source.parseFn;
+      this.storageClient = source.bqServices.getStorageClient(options);
+      this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class);
+      this.currentStreamBundleIndex = 0;
+      this.fractionOfStreamBundleConsumed = 0d;
+      this.progressAtResponseStart = 0d;
+      this.progressAtResponseEnd = 0d;
+      this.rowsConsumedFromCurrentResponse = 0L;
+      this.totalRowsInCurrentResponse = 0L;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return currentStreamBundleIndex;
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      // The start of every Stream within a StreamBundle is being defined as a split point. This
+      // implies that we cannot split below the granularity of a Stream
+      if (currentStreamOffset == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public boolean startImpl() throws IOException {
+      return readNextStream();
+    }
+
+    @Override
+    public boolean advanceImpl() throws IOException {
+      Preconditions.checkStateNotNull(responseIterator);
+      currentStreamOffset += totalRowsInCurrentResponse;
+      return readNextRecord();
+    }
+
+    private synchronized boolean readNextStream() throws IOException {

Review Comment:
   Making this method synchronized means that we'll perform the entirety of the `ReadRows` call -- including any backoff / retry due to concurrency limits -- with the object lock held. Did we not find a way to avoid this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org