You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/19 18:16:33 UTC
[beam] branch master updated: Vortex performance improvement: Enable multiple stream clients per worker (#17550)
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 47d8bce7e17 Vortex performance improvement: Enable multiple stream clients per worker (#17550)
47d8bce7e17 is described below
commit 47d8bce7e17efd6af18c40be70290b79758241e8
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Thu May 19 11:16:25 2022 -0700
Vortex performance improvement: Enable multiple stream clients per worker (#17550)
---
.../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 18 +++-
.../StorageApiWriteRecordsInconsistent.java | 5 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 100 ++++++++++++++-------
3 files changed, 89 insertions(+), 34 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index a9beb5cbd7c..cfebb754216 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -93,12 +93,22 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);
@Description(
- "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
+ "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();
void setNumStorageWriteApiStreams(Integer value);
+ @Description(
+ "When using the {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method#STORAGE_API_AT_LEAST_ONCE} write method, "
+ + "this option sets the number of stream append clients that will be allocated at a per worker and destination basis. "
+ + "A large value can cause a large pipeline to go over the BigQuery connection quota quickly on a job with "
+ + "enough number of workers. On the case of low-mid volume pipelines using the default configuration should be sufficient.")
+ @Default.Integer(1)
+ Integer getNumStorageWriteApiStreamAppendClients();
+
+ void setNumStorageWriteApiStreamAppendClients(Integer value);
+
@Description(
"If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
Integer getStorageWriteApiTriggeringFrequencySec();
@@ -129,4 +139,10 @@ public interface BigQueryOptions
Integer getStorageApiAppendThresholdBytes();
void setStorageApiAppendThresholdBytes(Integer value);
+
+ @Description("Maximum (best effort) record count of a single append to the storage API.")
+ @Default.Integer(150000)
+ Integer getStorageApiAppendThresholdRecordCount();
+
+ void setStorageApiAppendThresholdRecordCount(Integer value);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
index e433925e5b3..35b3ddfd080 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
@@ -40,7 +40,6 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
- ;
this.bqServices = bqServices;
}
@@ -57,7 +56,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
dynamicDestinations,
bqServices,
true,
- bigQueryOptions.getStorageApiAppendThresholdBytes()))
+ bigQueryOptions.getStorageApiAppendThresholdBytes(),
+ bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
+ bigQueryOptions.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f033f423466..7995660af58 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,9 +29,12 @@ import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -57,7 +60,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
@@ -79,13 +81,12 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
- // The Guava cache object is threadsafe. However our protocol requires that client pin the
- // StreamAppendClient
- // after looking up the cache, and we must ensure that the cache is not accessed in between the
- // lookup and the pin
- // (any access of the cache could trigger element expiration). Therefore most used of
- // APPEND_CLIENTS should
- // synchronize.
+ /**
+ * The Guava cache object is thread-safe. However our protocol requires that client pin the
+ * StreamAppendClient after looking up the cache, and we must ensure that the cache is not
+ * accessed in between the lookup and the pin (any access of the cache could trigger element
+ * expiration). Therefore most used of APPEND_CLIENTS should synchronize.
+ */
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
@@ -138,7 +139,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
dynamicDestinations,
bqServices,
false,
- options.getStorageApiAppendThresholdBytes()))
+ options.getStorageApiAppendThresholdBytes(),
+ options.getStorageApiAppendThresholdRecordCount(),
+ options.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes, the append operations
@@ -171,18 +174,21 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
private Instant nextCacheTickle;
+ private final int clientNumber;
public DestinationState(
String tableUrn,
MessageConverter<ElementT> messageConverter,
DatasetService datasetService,
- boolean useDefaultStream) {
+ boolean useDefaultStream,
+ int streamAppendClientCount) {
this.tableUrn = tableUrn;
this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.datasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.descriptorWrapper = messageConverter.getSchemaDescriptor();
+ this.clientNumber = new Random().nextInt(streamAppendClientCount);
}
void teardown() {
@@ -197,6 +203,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
}
+ String getStreamAppendClientCacheEntryKey() {
+ if (useDefaultStream) {
+ return getDefaultStreamName() + "-client" + clientNumber;
+ }
+ return this.streamName;
+ }
+
String createStreamIfNeeded() {
try {
if (!useDefaultStream) {
@@ -213,6 +226,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
return this.streamName;
}
+ StreamAppendClient generateClient() throws Exception {
+ return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
+ }
+
StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
@@ -221,14 +238,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
- streamName,
- () ->
- datasetService.getStreamAppendClient(
- streamName, descriptorWrapper.descriptor));
+ getStreamAppendClientCacheEntryKey(), () -> generateClient());
} else {
- this.streamAppendClient =
- datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
- APPEND_CLIENTS.put(streamName, this.streamAppendClient);
+ this.streamAppendClient = generateClient();
+ // override the clients in the cache
+ APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), streamAppendClient);
}
this.streamAppendClient.pin();
}
@@ -244,7 +258,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
void maybeTickleCache() {
if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
synchronized (APPEND_CLIENTS) {
- APPEND_CLIENTS.getIfPresent(streamName);
+ APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
}
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
}
@@ -262,12 +276,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
// cache still contains the object we created before invalidating (in case another
// thread has already invalidated
// and recreated the stream).
+ String cacheEntryKey = getStreamAppendClientCacheEntryKey();
@Nullable
- StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
+ StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
== System.identityHashCode(streamAppendClient)) {
- APPEND_CLIENTS.invalidate(streamName);
+ APPEND_CLIENTS.invalidate(cacheEntryKey);
}
}
streamAppendClient = null;
@@ -327,7 +342,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
if (writeStream.getInflightWaitSeconds() > 5) {
LOG.warn(
- "Storage Api write delay more than " + writeStream.getInflightWaitSeconds());
+ "Storage Api write delay more than {} seconds.",
+ writeStream.getInflightWaitSeconds());
}
return response;
} catch (Exception e) {
@@ -335,11 +351,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
},
contexts -> {
- LOG.info(
- "Append to stream "
- + streamName
- + " failed with error "
- + Iterables.getFirst(contexts, null).getError());
+ LOG.warn(
+ "Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
+ streamName,
+ clientNumber,
+ retrieveErrorDetails(contexts));
invalidateWriteStream();
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
@@ -350,6 +366,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
new Context<>());
maybeTickleCache();
}
+
+ String retrieveErrorDetails(Iterable<Context<AppendRowsResponse>> contexts) {
+ return StreamSupport.stream(contexts.spliterator(), false)
+ .map(ctx -> ctx.getError())
+ .map(
+ err ->
+ String.format(
+ "message: %s, stacktrace: %s",
+ err.toString(),
+ Lists.newArrayList(err.getStackTrace()).stream()
+ .map(se -> se.toString())
+ .collect(Collectors.joining("\n"))))
+ .collect(Collectors.joining(","));
+ }
}
private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
@@ -357,28 +387,32 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
- private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
+ private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
+ private int streamAppendClientCount;
WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
boolean useDefaultStream,
- int flushThresholdBytes) {
+ int flushThresholdBytes,
+ int flushThresholdCount,
+ int streamAppendClientCount) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
this.flushThresholdBytes = flushThresholdBytes;
+ this.flushThresholdCount = flushThresholdCount;
+ this.streamAppendClientCount = streamAppendClientCount;
}
boolean shouldFlush() {
- return numPendingRecords > FLUSH_THRESHOLD_RECORDS
- || numPendingRecordBytes > flushThresholdBytes;
+ return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
}
void flushIfNecessary() throws Exception {
@@ -432,7 +466,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
throw new RuntimeException(e);
}
return new DestinationState(
- tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
+ tableDestination1.getTableUrn(),
+ messageConverter,
+ datasetService,
+ useDefaultStream,
+ streamAppendClientCount);
}
@ProcessElement