You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/14 17:02:30 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #11767: [BEAM-11648] BigQuery Storage API sink

chamikaramj commented on a change in pull request #11767:
URL: https://github.com/apache/beam/pull/11767#discussion_r592921411



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1724,7 +1725,8 @@ static String getExtractDestinationUri(String extractDestinationDir) {
        * href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into
        * BigQuery</a>.
        */
-      STREAMING_INSERTS
+      STREAMING_INSERTS,
+      STORAGE_API_WRITES

Review comment:
       Should this be experimental initially ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -164,6 +172,47 @@ void createDataset(
     /** Patch BigQuery {@link Table} description. */
     Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
         throws IOException, InterruptedException;
+
+    /** Create a Write Stream for use with the the Storage Write API. */
+    WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
+        throws IOException, InterruptedException;
+
+    /**
+     * Create an append client for a given Storage API write stream. The stream must be created
+     * first.
+     */
+    StreamAppendClient getStreamAppendClient(String streamName) throws Exception;
+
+    /** Flush a given stream up to the given offset. The stream must have type BUFFERED. */
+    ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
+        throws IOException, InterruptedException;
+
+    /**

Review comment:
       Probably cleaner to move these methods to a new interface ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
##########
@@ -78,4 +78,19 @@
   Integer getBqStreamingApiLoggingFrequencySec();
 
   void setBqStreamingApiLoggingFrequencySec(Integer value);
+
+  @Description("If set, then BigQueryIO.Write will default to using the Storage API.")
+  @Default.Boolean(false)
+  Boolean getUseStorageApiWrites();

Review comment:
       Ditto regarding checking naming.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -164,6 +172,47 @@ void createDataset(
     /** Patch BigQuery {@link Table} description. */
     Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
         throws IOException, InterruptedException;
+
+    /** Create a Write Stream for use with the the Storage Write API. */
+    WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
+        throws IOException, InterruptedException;
+
+    /**
+     * Create an append client for a given Storage API write stream. The stream must be created
+     * first.
+     */
+    StreamAppendClient getStreamAppendClient(String streamName) throws Exception;
+
+    /** Flush a given stream up to the given offset. The stream must have type BUFFERED. */
+    ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
+        throws IOException, InterruptedException;
+
+    /**
+     * Finalize a write stream. After finalization, no more records can be appended to the stream.
+     */
+    ApiFuture<FinalizeWriteStreamResponse> finalizeWriteStream(String streamName);
+
+    /** Commit write streams of type PENDING. The streams must be finalized before committing. */
+    ApiFuture<BatchCommitWriteStreamsResponse> commitWriteStreams(
+        String tableUrn, Iterable<String> writeStreamNames);
+  }
+
+  /** An interface for appending records to a Storage API write stream. */
+  interface StreamAppendClient extends AutoCloseable {

Review comment:
       Are these methods thread safe ? Probably we should clarify here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1664,6 +1664,7 @@ static String getExtractDestinationUri(String extractDestinationDir) {
         .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
         .setSchemaUpdateOptions(Collections.emptySet())
         .setNumFileShards(0)
+        .setNumStorageApiStreams(0)

Review comment:
       I assume zero just means default ? (probably good to clarify with a comment).

##########
File path: examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryStorageAPIStreamingIT.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.auto.value.AutoValue;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQueryStorageAPIStreamingIT {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Value {
+    public abstract long getNumber();
+
+    @Nullable
+    public abstract ByteBuffer getPayload();
+  }
+
+  public interface Options extends TestPipelineOptions {
+    @Description("The number of records per second to generate.")
+    @Default.Integer(10000)
+    Integer getRecordsPerSecond();
+
+    void setRecordsPerSecond(Integer recordsPerSecond);
+
+    @Description("The size of the records to write in bytes.")
+    @Default.Integer(1024)
+    Integer getPayloadSizeBytes();
+
+    void setPayloadSizeBytes(Integer payloadSizeBytes);
+
+    @Description("Parallelism used for Storage API writes.")
+    @Default.Integer(5)
+    Integer getNumShards();
+
+    void setNumShards(Integer numShards);
+
+    @Description("Frequency to trigger appends. Each shard triggers independently.")
+    @Default.Integer(5)
+    Integer getTriggerFrequencySec();
+
+    void setTriggerFrequencySec(Integer triggerFrequencySec);
+
+    @Description("The table to write to.")
+    String getTargetTable();
+
+    void setTargetTable(String table);
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(Options.class);
+  }
+
+  @Test
+  public void testStorageAPIStreaming() throws Exception {
+    Options options = TestPipeline.testingPipelineOptions().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+    final int payloadSizeBytes = options.getPayloadSizeBytes();
+
+    // Generate input.
+    PCollection<Value> values =
+        //   p.apply(GenerateSequence.from(1).to(1000000L))

Review comment:
       There are some commented out lines here and below.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
##########
@@ -135,16 +135,36 @@
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;

Review comment:
       I didn't look into tests in detail yet but given the extremely large change and number of potential users, we should try to get close to 100% test coverage and 100% test parity with streaming inserts (both unit and integration).

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1771,6 +1773,8 @@ static String getExtractDestinationUri(String extractDestinationDir) {
 
     abstract int getNumFileShards();
 
+    abstract int getNumStorageApiStreams();

Review comment:
       Replace "StorageApi" with "WriteApi" for clarity here and elsewhere ? I'm not sure what the correct terminology is (Storage API or Write API) but we use term "Storage API" for the Read API [1] elsewhere.
   
   [1] https://cloud.google.com/bigquery/docs/reference/storage

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -1055,6 +1084,104 @@ public Table patchTableDescription(
           createDefaultBackoff(),
           ALWAYS_RETRY);
     }
+
+    @Override
+    public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
+        throws IOException {
+      return newWriteClient.createWriteStream(
+          CreateWriteStreamRequest.newBuilder()
+              .setParent(tableUrn)
+              .setWriteStream(WriteStream.newBuilder().setType(type).build())
+              .build());
+    }
+
+    @Override
+    public StreamAppendClient getStreamAppendClient(String streamName) throws Exception {
+      StreamWriterV2 streamWriter = StreamWriterV2.newBuilder(streamName).build();
+      return new StreamAppendClient() {
+        private int pins = 0;
+        private boolean closed = false;
+
+        @Override
+        public void close() throws Exception {
+          boolean closeWriter;
+          synchronized (this) {
+            Preconditions.checkState(!closed);
+            closed = true;
+            closeWriter = (pins == 0);
+          }
+          if (closeWriter) {
+            streamWriter.close();
+          }
+        }
+
+        @Override
+        public void pin() {
+          synchronized (this) {
+            Preconditions.checkState(!closed);
+            ++pins;
+          }
+        }
+
+        @Override
+        public void unpin() throws Exception {
+          boolean closeWriter;
+          synchronized (this) {
+            Preconditions.checkState(pins > 0);
+            --pins;
+            closeWriter = (pins == 0) && closed;
+          }
+          if (closeWriter) {
+            streamWriter.close();
+          }
+        }
+
+        @Override
+        public ApiFuture<AppendRowsResponse> appendRows(
+            long offset, ProtoRows rows, Descriptor descriptor) throws Exception {
+          final AppendRowsRequest.ProtoData data =
+              AppendRowsRequest.ProtoData.newBuilder()
+                  .setWriterSchema(
+                      ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build())
+                  .setRows(rows)
+                  .build();
+          AppendRowsRequest.Builder appendRequestBuilder =
+              AppendRowsRequest.newBuilder().setProtoRows(data).setWriteStream(streamName);
+          if (offset >= 0) {
+            appendRequestBuilder = appendRequestBuilder.setOffset(Int64Value.of(offset));
+          }
+          return streamWriter.append(appendRequestBuilder.build());

Review comment:
       Does offset==0 mean the current offset or do we have to explicitly set it to 0 ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -164,6 +172,47 @@ void createDataset(
     /** Patch BigQuery {@link Table} description. */
     Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
         throws IOException, InterruptedException;
+
+    /** Create a Write Stream for use with the the Storage Write API. */
+    WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
+        throws IOException, InterruptedException;
+
+    /**
+     * Create an append client for a given Storage API write stream. The stream must be created
+     * first.
+     */
+    StreamAppendClient getStreamAppendClient(String streamName) throws Exception;
+
+    /** Flush a given stream up to the given offset. The stream must have type BUFFERED. */

Review comment:
       Could you clarify what this means ? I'm bit confused since flushing a stream is usually not conditional on an offset.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
+import org.joda.time.Duration;
+
+/**
+ * Retry manager used by Storage API operations. This class manages a sequence of operations (e.g.
+ * sequential appends to a stream) and retries of those operations.
+ */
+class RetryManager<ResultT, ContextT extends Context<ResultT>> {
+  private Queue<Operation<ResultT, ContextT>> operations;
+  private final BackOff backoff;
+  private final ExecutorService executor;
+
+  enum RetryType {
+    DONT_RETRY,

Review comment:
       Do we want to support more customization here (in the future) ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This {@link PTransform} manages loads into BigQuery using the Storage API. */
+public class StorageApiLoads<DestinationT, ElementT>
+    extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(StorageApiLoads.class);
+  static final int FILE_TRIGGERING_RECORD_COUNT = 100;
+
+  private final Coder<DestinationT> destinationCoder;
+  private final Coder<ElementT> elementCoder;
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
+  private final CreateDisposition createDisposition;
+  private final String kmsKey;
+  private final Duration triggeringFrequency;
+  private final BigQueryServices bqServices;
+  private final int numShards;
+
+  public StorageApiLoads(
+      Coder<DestinationT> destinationCoder,
+      Coder<ElementT> elementCoder,
+      StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
+      CreateDisposition createDisposition,
+      String kmsKey,
+      Duration triggeringFrequency,
+      BigQueryServices bqServices,
+      int numShards) {
+    this.destinationCoder = destinationCoder;
+    this.elementCoder = elementCoder;
+    this.dynamicDestinations = dynamicDestinations;
+    this.createDisposition = createDisposition;
+    this.kmsKey = kmsKey;
+    this.triggeringFrequency = triggeringFrequency;
+    this.bqServices = bqServices;
+    this.numShards = numShards;
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
+    return triggeringFrequency != null ? expandTriggered(input) : expandUntriggered(input);
+  }
+
+  public WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {

Review comment:
       Why do we need triggered writes for Storage API ? Is batching at the write DoFn inadequate for some reason.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
+import org.joda.time.Duration;
+
+/**
+ * Retry manager used by Storage API operations. This class manages a sequence of operations (e.g.
+ * sequential appends to a stream) and retries of those operations.
+ */
+class RetryManager<ResultT, ContextT extends Context<ResultT>> {
+  private Queue<Operation<ResultT, ContextT>> operations;
+  private final BackOff backoff;
+  private final ExecutorService executor;
+
+  enum RetryType {
+    DONT_RETRY,
+    RETRY_ALL_OPERATIONS
+  };
+
+  RetryManager(Duration initialBackoff, Duration maxBackoff, int maxRetries) {
+    this.operations = Queues.newArrayDeque();
+    backoff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(initialBackoff)
+            .withMaxBackoff(maxBackoff)
+            .withMaxRetries(maxRetries)
+            .backoff();
+    this.executor = Executors.newCachedThreadPool();
+  }
+
+  static class Operation<ResultT, ContextT extends Context<ResultT>> {
+    static class Context<ResultT> {
+      private @Nullable Throwable error = null;
+      private @Nullable ResultT result = null;
+
+      public void setError(@Nullable Throwable error) {
+        this.error = error;
+      }
+
+      public @Nullable Throwable getError() {
+        return error;
+      }
+
+      public void setResult(@Nullable ResultT result) {
+        this.result = result;
+      }
+
+      public @Nullable ResultT getResult() {
+        return result;
+      }
+    }
+
+    private final Function<ContextT, ApiFuture<ResultT>> runOperation;
+    private final Function<Iterable<ContextT>, RetryType> onError;
+    private final Consumer<ContextT> onSuccess;
+    @Nullable private ApiFuture<ResultT> future = null;
+    @Nullable private Callback<ResultT> callback = null;
+    @Nullable ContextT context = null;
+
+    public Operation(
+        Function<ContextT, ApiFuture<ResultT>> runOperation,
+        Function<Iterable<ContextT>, RetryType> onError,
+        Consumer<ContextT> onSuccess,
+        ContextT context) {
+      this.runOperation = runOperation;
+      this.onError = onError;
+      this.onSuccess = onSuccess;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"nullness"})
+    void run(Executor executor) {
+      this.future = runOperation.apply(context);
+      this.callback = new Callback<>();
+      ApiFutures.addCallback(future, callback, executor);
+    }
+
+    @SuppressWarnings({"nullness"})
+    boolean await() throws Exception {
+      callback.await();
+      return callback.getFailed();
+    }
+  }
+
+  private static class Callback<ResultT> implements ApiFutureCallback<ResultT> {
+    private final CountDownLatch waiter;
+    @Nullable private Throwable failure = null;
+    boolean failed = false;
+
+    Callback() {
+      this.waiter = new CountDownLatch(1);
+    }
+
+    void await() throws InterruptedException {
+      waiter.await();
+    }
+
+    boolean await(long timeoutSec) throws InterruptedException {
+      return waiter.await(timeoutSec, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      synchronized (this) {
+        failure = t;
+        failed = true;
+      }
+      waiter.countDown();
+    }
+
+    @Override
+    public void onSuccess(ResultT result) {
+      synchronized (this) {
+        failure = null;
+      }
+      waiter.countDown();
+    }
+
+    @Nullable
+    Throwable getFailure() {
+      synchronized (this) {
+        return failure;
+      }
+    }
+
+    boolean getFailed() {
+      synchronized (this) {
+        return failed;
+      }
+    }
+  }
+
+  void addOperation(
+      Function<ContextT, ApiFuture<ResultT>> runOperation,
+      Function<Iterable<ContextT>, RetryType> onError,
+      Consumer<ContextT> onSuccess,
+      ContextT context)
+      throws Exception {
+    addOperation(new Operation<>(runOperation, onError, onSuccess, context));
+  }
+
+  void addAndRunOperation(
+      Function<ContextT, ApiFuture<ResultT>> runOperation,
+      Function<Iterable<ContextT>, RetryType> onError,
+      Consumer<ContextT> onSuccess,
+      ContextT context)
+      throws Exception {
+    addAndRunOperation(new Operation<>(runOperation, onError, onSuccess, context));
+  }
+
+  void addOperation(Operation<ResultT, ContextT> operation) {
+    operations.add(operation);
+  }
+
+  void addAndRunOperation(Operation<ResultT, ContextT> operation) {
+    operation.run(executor);
+    operations.add(operation);
+  }
+
+  void run(boolean await) throws Exception {
+    for (Operation<ResultT, ContextT> operation : operations) {
+      operation.run(executor);
+    }
+    if (await) {
+      await();
+    }
+  }
+
+  @SuppressWarnings({"nullness"})
+  void await() throws Exception {
+    while (!this.operations.isEmpty()) {
+      Operation<ResultT, ContextT> operation = this.operations.element();
+      boolean failed = operation.await();
+      if (failed) {
+        Throwable failure = operation.callback.getFailure();
+        operation.context.setError(failure);
+        RetryType retryType =
+            operation.onError.apply(
+                operations.stream().map(o -> o.context).collect(Collectors.toList()));
+        if (retryType != RetryType.DONT_RETRY) {
+          Preconditions.checkState(RetryType.RETRY_ALL_OPERATIONS == retryType);
+          if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+            throw new RuntimeException(failure);
+          }
+          for (Operation<ResultT, ?> awaitOperation : operations) {
+            awaitOperation.await();
+          }
+          // Run all the operations again.
+          run(false);
+        } else {

Review comment:
       Probably cleaner to explicitly check for "RetryType.DONT_RETRY" here and fail for else clause. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
##########
@@ -123,6 +123,13 @@ public String getTableSpec() {
     return tableSpec;
   }
 
+  public String getTableUrn() {

Review comment:
       Having both "getTableSpec" and "getTableUrn" can be confusing. At least we should clearly document the difference.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
+import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This DoFn flushes and optionally (if requested) finalizes Storage API streams. */
+public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>, Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(StorageApiFlushAndFinalizeDoFn.class);
+
+  private final BigQueryServices bqServices;
+  @Nullable private DatasetService datasetService = null;
+  private final Counter flushOperationsSent =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSent");
+  private final Counter flushOperationsSucceeded =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSucceeded");
+  private final Counter flushOperationsFailed =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsFailed");
+  private final Counter flushOperationsAlreadyExists =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsAlreadyExists");
+  private final Counter flushOperationsInvalidArgument =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsInvalidArgument");
+  private final Distribution flushLatencyDistribution =
+      Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, "flushOperationLatencyMs");
+  private final Counter finalizeOperationsSent =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsSent");
+  private final Counter finalizeOperationsSucceeded =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsSucceeded");
+  private final Counter finalizeOperationsFailed =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsFailed");
+
+  @DefaultSchema(JavaFieldSchema.class)
+  static class Operation implements Comparable<Operation>, Serializable {
+    final long flushOffset;
+    final boolean finalizeStream;
+
+    @SchemaCreate
+    public Operation(long flushOffset, boolean finalizeStream) {
+      this.flushOffset = flushOffset;
+      this.finalizeStream = finalizeStream;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Operation operation = (Operation) o;
+      return flushOffset == operation.flushOffset && finalizeStream == operation.finalizeStream;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(flushOffset, finalizeStream);
+    }
+
+    @Override
+    public int compareTo(Operation other) {
+      int compValue = Long.compare(this.flushOffset, other.flushOffset);
+      if (compValue == 0) {
+        compValue = Boolean.compare(this.finalizeStream, other.finalizeStream);
+      }
+      return compValue;
+    }
+  }
+
+  public StorageApiFlushAndFinalizeDoFn(BigQueryServices bqServices) {
+    this.bqServices = bqServices;
+  }
+
+  private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
+    if (datasetService == null) {
+      datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+    }
+    return datasetService;
+  }
+
+  @SuppressWarnings({"nullness"})
+  @ProcessElement
+  public void process(PipelineOptions pipelineOptions, @Element KV<String, Operation> element)
+      throws Exception {
+    final String streamId = element.getKey();
+    final Operation operation = element.getValue();
+    final DatasetService datasetService = getDatasetService(pipelineOptions);
+    // Flush the stream. If the flush offset < 0, that means we only need to finalize.
+    long offset = operation.flushOffset;
+    if (offset >= 0) {
+      Instant now = Instant.now();
+      RetryManager<FlushRowsResponse, Context<FlushRowsResponse>> retryManager =
+          new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 3);
+      retryManager.addOperation(
+          // runOperation
+          c -> {
+            try {
+              flushOperationsSent.inc();
+              return datasetService.flush(streamId, offset);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          },
+          // onError
+          contexts -> {
+            Throwable error = Iterables.getFirst(contexts, null).getError();
+            LOG.warn(
+                "Flush of stream " + streamId + " to offset " + offset + " failed with " + error);
+            flushOperationsFailed.inc();
+            if (error instanceof ApiException) {
+              Code statusCode = ((ApiException) error).getStatusCode().getCode();
+              if (statusCode.equals(Code.ALREADY_EXISTS)) {
+                flushOperationsAlreadyExists.inc();
+                // Implies that we have already flushed up to this point, so don't retry.
+                return RetryType.DONT_RETRY;
+              }
+              if (statusCode.equals(Code.INVALID_ARGUMENT)) {

Review comment:
       This sounds strange. Does "Code.INVALID_ARGUMENT" always mean already finalized ? I wonder if we'll end up swallowing some valid errors here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org