You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/19 18:16:33 UTC

[beam] branch master updated: Vortex performance improvement: Enable multiple stream clients per worker (#17550)

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d8bce7e17 Vortex performance improvement: Enable multiple stream clients per worker (#17550)
47d8bce7e17 is described below

commit 47d8bce7e17efd6af18c40be70290b79758241e8
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Thu May 19 11:16:25 2022 -0700

    Vortex performance improvement: Enable multiple stream clients per worker (#17550)
---
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |  18 +++-
 .../StorageApiWriteRecordsInconsistent.java        |   5 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 100 ++++++++++++++-------
 3 files changed, 89 insertions(+), 34 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index a9beb5cbd7c..cfebb754216 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -93,12 +93,22 @@ public interface BigQueryOptions
   void setUseStorageWriteApiAtLeastOnce(Boolean value);
 
   @Description(
-      "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
+      "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ")
   @Default.Integer(0)
   Integer getNumStorageWriteApiStreams();
 
   void setNumStorageWriteApiStreams(Integer value);
 
+  @Description(
+      "When using the {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method#STORAGE_API_AT_LEAST_ONCE} write method, "
+          + "this option sets the number of stream append clients that will be allocated at a per worker and destination basis. "
+          + "A large value can cause a large pipeline to go over the BigQuery connection quota quickly on a job with "
+          + "enough number of workers. On the case of low-mid volume pipelines using the default configuration should be sufficient.")
+  @Default.Integer(1)
+  Integer getNumStorageWriteApiStreamAppendClients();
+
+  void setNumStorageWriteApiStreamAppendClients(Integer value);
+
   @Description(
       "If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
   Integer getStorageWriteApiTriggeringFrequencySec();
@@ -129,4 +139,10 @@ public interface BigQueryOptions
   Integer getStorageApiAppendThresholdBytes();
 
   void setStorageApiAppendThresholdBytes(Integer value);
+
+  @Description("Maximum (best effort) record count of a single append to the storage API.")
+  @Default.Integer(150000)
+  Integer getStorageApiAppendThresholdRecordCount();
+
+  void setStorageApiAppendThresholdRecordCount(Integer value);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
index e433925e5b3..35b3ddfd080 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
@@ -40,7 +40,6 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
       StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
       BigQueryServices bqServices) {
     this.dynamicDestinations = dynamicDestinations;
-    ;
     this.bqServices = bqServices;
   }
 
@@ -57,7 +56,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
                     dynamicDestinations,
                     bqServices,
                     true,
-                    bigQueryOptions.getStorageApiAppendThresholdBytes()))
+                    bigQueryOptions.getStorageApiAppendThresholdBytes(),
+                    bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
+                    bigQueryOptions.getNumStorageWriteApiStreamAppendClients()))
             .withSideInputs(dynamicDestinations.getSideInputs()));
     return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f033f423466..7995660af58 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,9 +29,12 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -57,7 +60,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.joda.time.Duration;
@@ -79,13 +81,12 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
   private final BigQueryServices bqServices;
   private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
 
-  // The Guava cache object is threadsafe. However our protocol requires that client pin the
-  // StreamAppendClient
-  // after looking up the cache, and we must ensure that the cache is not accessed in between the
-  // lookup and the pin
-  // (any access of the cache could trigger element expiration). Therefore most used of
-  // APPEND_CLIENTS should
-  // synchronize.
+  /**
+   * The Guava cache object is thread-safe. However our protocol requires that client pin the
+   * StreamAppendClient after looking up the cache, and we must ensure that the cache is not
+   * accessed in between the lookup and the pin (any access of the cache could trigger element
+   * expiration). Therefore most used of APPEND_CLIENTS should synchronize.
+   */
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
           .expireAfterAccess(15, TimeUnit.MINUTES)
@@ -138,7 +139,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                         dynamicDestinations,
                         bqServices,
                         false,
-                        options.getStorageApiAppendThresholdBytes()))
+                        options.getStorageApiAppendThresholdBytes(),
+                        options.getStorageApiAppendThresholdRecordCount(),
+                        options.getNumStorageWriteApiStreamAppendClients()))
                 .withSideInputs(dynamicDestinations.getSideInputs()))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
         // Calling Reshuffle makes the output stable - once this completes, the append operations
@@ -171,18 +174,21 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       private final boolean useDefaultStream;
       private DescriptorWrapper descriptorWrapper;
       private Instant nextCacheTickle;
+      private final int clientNumber;
 
       public DestinationState(
           String tableUrn,
           MessageConverter<ElementT> messageConverter,
           DatasetService datasetService,
-          boolean useDefaultStream) {
+          boolean useDefaultStream,
+          int streamAppendClientCount) {
         this.tableUrn = tableUrn;
         this.messageConverter = messageConverter;
         this.pendingMessages = Lists.newArrayList();
         this.datasetService = datasetService;
         this.useDefaultStream = useDefaultStream;
         this.descriptorWrapper = messageConverter.getSchemaDescriptor();
+        this.clientNumber = new Random().nextInt(streamAppendClientCount);
       }
 
       void teardown() {
@@ -197,6 +203,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
       }
 
+      String getStreamAppendClientCacheEntryKey() {
+        if (useDefaultStream) {
+          return getDefaultStreamName() + "-client" + clientNumber;
+        }
+        return this.streamName;
+      }
+
       String createStreamIfNeeded() {
         try {
           if (!useDefaultStream) {
@@ -213,6 +226,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         return this.streamName;
       }
 
+      StreamAppendClient generateClient() throws Exception {
+        return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
+      }
+
       StreamAppendClient getStreamAppendClient(boolean lookupCache) {
         try {
           if (streamAppendClient == null) {
@@ -221,14 +238,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               if (lookupCache) {
                 this.streamAppendClient =
                     APPEND_CLIENTS.get(
-                        streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                streamName, descriptorWrapper.descriptor));
+                        getStreamAppendClientCacheEntryKey(), () -> generateClient());
               } else {
-                this.streamAppendClient =
-                    datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
-                APPEND_CLIENTS.put(streamName, this.streamAppendClient);
+                this.streamAppendClient = generateClient();
+                // override the clients in the cache
+                APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), streamAppendClient);
               }
               this.streamAppendClient.pin();
             }
@@ -244,7 +258,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       void maybeTickleCache() {
         if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
           synchronized (APPEND_CLIENTS) {
-            APPEND_CLIENTS.getIfPresent(streamName);
+            APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
           }
           nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
         }
@@ -262,12 +276,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
             // cache still contains the object we created before invalidating (in case another
             // thread has already invalidated
             // and recreated the stream).
+            String cacheEntryKey = getStreamAppendClientCacheEntryKey();
             @Nullable
-            StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
+            StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
             if (cachedAppendClient != null
                 && System.identityHashCode(cachedAppendClient)
                     == System.identityHashCode(streamAppendClient)) {
-              APPEND_CLIENTS.invalidate(streamName);
+              APPEND_CLIENTS.invalidate(cacheEntryKey);
             }
           }
           streamAppendClient = null;
@@ -327,7 +342,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                 inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
                 if (writeStream.getInflightWaitSeconds() > 5) {
                   LOG.warn(
-                      "Storage Api write delay more than " + writeStream.getInflightWaitSeconds());
+                      "Storage Api write delay more than {} seconds.",
+                      writeStream.getInflightWaitSeconds());
                 }
                 return response;
               } catch (Exception e) {
@@ -335,11 +351,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               }
             },
             contexts -> {
-              LOG.info(
-                  "Append to stream "
-                      + streamName
-                      + " failed with error "
-                      + Iterables.getFirst(contexts, null).getError());
+              LOG.warn(
+                  "Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
+                  streamName,
+                  clientNumber,
+                  retrieveErrorDetails(contexts));
               invalidateWriteStream();
               appendFailures.inc();
               return RetryType.RETRY_ALL_OPERATIONS;
@@ -350,6 +366,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
             new Context<>());
         maybeTickleCache();
       }
+
+      String retrieveErrorDetails(Iterable<Context<AppendRowsResponse>> contexts) {
+        return StreamSupport.stream(contexts.spliterator(), false)
+            .map(ctx -> ctx.getError())
+            .map(
+                err ->
+                    String.format(
+                        "message: %s, stacktrace: %s",
+                        err.toString(),
+                        Lists.newArrayList(err.getStackTrace()).stream()
+                            .map(se -> se.toString())
+                            .collect(Collectors.joining("\n"))))
+            .collect(Collectors.joining(","));
+      }
     }
 
     private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
@@ -357,28 +387,32 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
     private transient @Nullable DatasetService datasetService;
     private int numPendingRecords = 0;
     private int numPendingRecordBytes = 0;
-    private static final int FLUSH_THRESHOLD_RECORDS = 150000;
     private final int flushThresholdBytes;
+    private final int flushThresholdCount;
     private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
     private final BigQueryServices bqServices;
     private final boolean useDefaultStream;
+    private int streamAppendClientCount;
 
     WriteRecordsDoFn(
         String operationName,
         StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
         BigQueryServices bqServices,
         boolean useDefaultStream,
-        int flushThresholdBytes) {
+        int flushThresholdBytes,
+        int flushThresholdCount,
+        int streamAppendClientCount) {
       this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
       this.dynamicDestinations = dynamicDestinations;
       this.bqServices = bqServices;
       this.useDefaultStream = useDefaultStream;
       this.flushThresholdBytes = flushThresholdBytes;
+      this.flushThresholdCount = flushThresholdCount;
+      this.streamAppendClientCount = streamAppendClientCount;
     }
 
     boolean shouldFlush() {
-      return numPendingRecords > FLUSH_THRESHOLD_RECORDS
-          || numPendingRecordBytes > flushThresholdBytes;
+      return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
     }
 
     void flushIfNecessary() throws Exception {
@@ -432,7 +466,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         throw new RuntimeException(e);
       }
       return new DestinationState(
-          tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
+          tableDestination1.getTableUrn(),
+          messageConverter,
+          datasetService,
+          useDefaultStream,
+          streamAppendClientCount);
     }
 
     @ProcessElement