You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "jackdingilian (via GitHub)" <gi...@apache.org> on 2023/05/09 21:47:25 UTC

[GitHub] [beam] jackdingilian commented on a diff in pull request #26355: Add ReadChangeStream end time logic, minor cleanup

jackdingilian commented on code in PR #26355:
URL: https://github.com/apache/beam/pull/26355#discussion_r1189160458


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java:
##########
@@ -100,9 +101,30 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf
     dataSettingsBuilder.setInstanceId(instanceId);
     tableAdminSettingsBuilder.setInstanceId(instanceId);
 
-    if (appProfileId != null) {
-      dataSettingsBuilder.setAppProfileId(appProfileId);
-    }
+    String appProfileId = checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();

Review Comment:
   It is currently mandatory, we already had the `checkArgumentNotNull(bigtableConfig.getAppProfileId())` above. We are actually going to update this to default to the default app profile in a follow on though. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java:
##########
@@ -160,6 +167,22 @@ public ProcessContinuation run(
     // Process CloseStream if it exists
     CloseStream closeStream = tracker.currentRestriction().getCloseStream();
     if (closeStream != null) {
+      // tracker.currentRestriction().closeStream.getStatus()

Review Comment:
   Yes. Removed, thanks



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java:
##########
@@ -100,9 +101,30 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf
     dataSettingsBuilder.setInstanceId(instanceId);
     tableAdminSettingsBuilder.setInstanceId(instanceId);
 
-    if (appProfileId != null) {
-      dataSettingsBuilder.setAppProfileId(appProfileId);
-    }
+    String appProfileId = checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();
+    dataSettingsBuilder.setAppProfileId(appProfileId);
+
+    dataSettingsBuilder
+        .stubSettings()
+        .setTransportChannelProvider(
+            EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder()
+                .setAttemptDirectPath(false) // Disable DirectPath
+                .setChannelPoolSettings( // Autoscale Channel Size
+                    ChannelPoolSettings.builder()
+                        // Make sure that there are at least 2 channels regardless of RPCs
+                        .setMinChannelCount(2)
+                        // Limit number of channels to 100 regardless of QPS
+                        .setMaxChannelCount(100)
+                        // Start off with 5
+                        .setInitialChannelCount(5)
+                        // Make sure the channels are primed before use
+                        .setPreemptiveRefreshEnabled(true)
+                        // evict channels when there are less than 10 outstanding RPCs
+                        .setMinRpcsPerChannel(10)
+                        // add more channels when the channel has 50 outstanding RPCs
+                        .setMaxRpcsPerChannel(50)
+                        .build())

Review Comment:
   The channel pool settings are somewhat arbitrary. The main goal is to increase the number of concurrent streams per worker. We know that we expect a max of dataflow `workerHarnessThreads` open streams per worker (bc we use the blocking stream APIs) and that defaults to 500. Before this change, the channel pool size is 2, and there is a max of 100 concurrent streams per channel so we are limiting the number of active streams to 200 effectively.  Added a comment.
   
   The 10 attempts below is to match the default for other bigtable rpcs. We noticed that these were occasionally throwing exceptions for transient failures that should be retried while other methods weren't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org