You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/05/06 19:56:00 UTC

[jira] [Work logged] (BEAM-8376) Add FirestoreIO connector to Java SDK

     [ https://issues.apache.org/jira/browse/BEAM-8376?focusedWorklogId=592991&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-592991 ]

ASF GitHub Bot logged work on BEAM-8376:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/21 19:55
            Start Date: 06/May/21 19:55
    Worklog Time Spent: 10m 
      Work Description: danthev commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r627712849



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -204,7 +210,8 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
       state.checkActive();
       Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
       if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
-        logger.info("Delaying request by {}ms", shouldThrottleRequest.getMillis());
+        long throttleRequestMillis = shouldThrottleRequest.getMillis();
+        logger.debug("Delaying request by {}ms", throttleRequestMillis);

Review comment:
       This is when the adaptive throttler backs off, right? I think I'd tend toward an `INFO` log here, and a `due to previous failures` in the message.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -94,106 +108,124 @@ void handleWriteFailures(ContextAdapter<WriteFailure> context, Instant timestamp
 
   /**
    * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
-   * <p/>
-   * Writes will be enqueued to be sent at a potentially
-   * later time when more writes are available. This Fn attempts to maximize throughput while
-   * maintaining a high request success rate.
-   * <p/>
-   * All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   *
+   * <p>Writes will be enqueued to be sent at a potentially later time when more writes are
+   * available. This Fn attempts to maximize throughput while maintaining a high request success
+   * rate.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
    * the lifecycle of this Fn.
    */
-  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, Out> implements
-      HasRpcAttemptContext {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+  abstract static class BaseBatchWriteFn<OutT> extends WindowAwareDoFn<Write, OutT>
+      implements HasRpcAttemptContext {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
     private final JodaClock clock;
     private final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
     private final RpcQosOptions rpcQosOptions;
+    private final CounterFactory counterFactory;
+    private final V1FnRpcAttemptContext rpcAttemptContext;
 
     // transient running state information, not important to any possible checkpointing
-    private transient FirestoreRpc firestoreRpc;
+    //  worker scoped state
     private transient RpcQos rpcQos;
-    private transient String projectId;
+    private transient Counter writesSuccessful;
+    private transient Counter writesFailedRetryable;
+    private transient Counter writesFailedNonRetryable;
+    //  bundle scoped state
+    private transient FirestoreStub firestoreStub;
+    private transient DatabaseRootName databaseRootName;
+
     @VisibleForTesting
     transient Queue<@NonNull WriteElement> writes = new PriorityQueue<>(WriteElement.COMPARATOR);
-    @VisibleForTesting
-    transient int queueNextEntryPriority = 0;
 
-    @SuppressWarnings("initialization.fields.uninitialized") // allow transient fields to be managed by component lifecycle
+    @VisibleForTesting transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings(
+        "initialization.fields.uninitialized") // allow transient fields to be managed by component
+    // lifecycle
     BaseBatchWriteFn(
         JodaClock clock,
         FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
-        RpcQosOptions rpcQosOptions
-    ) {
+        RpcQosOptions rpcQosOptions,
+        CounterFactory counterFactory) {
       this.clock = clock;
       this.firestoreStatefulComponentFactory = firestoreStatefulComponentFactory;
       this.rpcQosOptions = rpcQosOptions;
-    }
-
-    Logger getLogger() {
-      return LOGGER;
+      this.counterFactory = counterFactory;
+      this.rpcAttemptContext = V1FnRpcAttemptContext.BatchWrite;
     }
 
     @Override
     public Context getRpcAttemptContext() {
-      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+      return rpcAttemptContext;
     }
 
     @Override
-    public final void populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {
-      builder
-          .include("rpcQosOptions", rpcQosOptions);
+    public final void populateDisplayData(
+        @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {

Review comment:
       Not checkerframework's `NonNull`?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -344,7 +354,8 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
       Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
       if (shouldThrottle.isPresent()) {
         Duration throttleDuration = shouldThrottle.get();
-        getLogger().debug("Still ramping up, Delaying request by {}ms", throttleDuration.getMillis());
+        long throttleDurationMillis = throttleDuration.getMillis();
+        getLogger().debug("Still ramping up, Delaying request by {}ms", throttleDurationMillis);

Review comment:
       Same here with `INFO` logs. If there's a better way that's fine too, what's important is that it's not too hard for the user to find out if they're being throttled by ramp-up or the adaptive throttler.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -83,48 +81,60 @@
   private final WriteRampUp writeRampUp;
   private final FluentBackoff fb;
 
-  private final WeakHashMap<Context, Counters> counters;
+  private final WeakHashMap<Context, O11y> counters;
   private final Random random;
   private final Sleeper sleeper;
-  private final Function<Context, Counters> computeCounters;
+  private final Function<Context, O11y> computeCounters;
+  private final DistributionFactory distributionFactory;
 
   RpcQosImpl(
       RpcQosOptions options,
       Random random,
       Sleeper sleeper,
-      CounterFactory counterFactory
-  ) {
+      CounterFactory counterFactory,
+      DistributionFactory distributionFactory) {
     this.options = options;
     this.random = random;
     this.sleeper = sleeper;
-    at = new AdaptiveThrottler();
-    wb = new WriteBatcher();
-    writeRampUp = new WriteRampUp(
-        Math.max(1, 500 / options.getHintMaxNumWorkers())
-    );
-    fb = FluentBackoff.DEFAULT
-        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an inclusive value, we want exclusive since we are tracking all attempts
-        .withInitialBackoff(options.getInitialBackoff());
+    DistributionFactory filteringDistributionFactory =
+        new DiagnosticOnlyFilteringDistributionFactory(
+            !options.isShouldReportDiagnosticMetrics(), distributionFactory);
+    this.distributionFactory = filteringDistributionFactory;
+    at =
+        new AdaptiveThrottler(
+            options.getSamplePeriod(),
+            options.getSamplePeriodBucketSize(),
+            options.getThrottleDuration(),
+            options.getOverloadRatio());
+    wb =
+        new WriteBatcher(
+            options.getSamplePeriod(),
+            options.getSamplePeriodBucketSize(),
+            options.getBatchInitialCount(),
+            options.getBatchTargetLatency(),
+            filteringDistributionFactory);
+    writeRampUp =
+        new WriteRampUp(
+            Math.max(1, 500 / options.getHintMaxNumWorkers()), filteringDistributionFactory);

Review comment:
       You can leave this if you want, but if it's not too much effort you could make the base budget a float and move `Math.max(1, ...)` into the per-second budget calculation. Then ramp-up isn't automatically faster for `hintMaxNumWorkers > 500`, but instead stays at the minimum level a little longer before actually ramping up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 592991)
    Time Spent: 23h 10m  (was: 23h)

> Add FirestoreIO connector to Java SDK
> -------------------------------------
>
>                 Key: BEAM-8376
>                 URL: https://issues.apache.org/jira/browse/BEAM-8376
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Stefan Djelekar
>            Priority: P3
>          Time Spent: 23h 10m
>  Remaining Estimate: 0h
>
> Motivation:
> There is no Firestore connector for Java SDK at the moment.
> Having it will enhance the integrations with database options on the Google Cloud Platform.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)