You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/07 19:58:38 UTC

[GitHub] [beam] igorbernstein2 commented on a diff in pull request #17823: [BEAM-14554] Create setting to report Bigtable client throttling time to Dataflow

igorbernstein2 commented on code in PR #17823:
URL: https://github.com/apache/beam/pull/17823#discussion_r911422373


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -97,6 +102,12 @@ abstract Builder setBigtableOptionsConfigurator(
 
     abstract Builder setEmulatorHost(String emulatorHost);
 
+    /*
+     This feature is experimental and may be changed and relocated in the future
+    */

Review Comment:
   You don't need Experimental here since this is a package protected class. But it could use a bit more documentation explaining what it does. At the very least a reference to the docs in BigtableIO



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -813,12 +839,34 @@ public String toString() {
     }
   }
 
-  private static class BigtableWriterFn
+  @VisibleForTesting
+  static class BigtableWriterFn
       extends DoFn<KV<ByteString, Iterable<Mutation>>, BigtableWriteResult> {
 
-    BigtableWriterFn(BigtableConfig bigtableConfig) {
+    protected final Counter cumulativeThrottlingMilliseconds =
+        Metrics.counter(BigtableWriterFn.class, "throttling-msecs");
+    private static Object metricLock = new Object();
+    private static long lastAggregatedThrottleTime = 0;
+    private ResourceStatsSupplier statsSupplier;
+
+    BigtableWriterFn(BigtableConfig bigtableConfig, ResourceStatsSupplier statsSupplier) {
       this.config = bigtableConfig;
       this.failures = new ConcurrentLinkedQueue<>();
+      this.statsSupplier = statsSupplier;
+    }
+
+    public static BigtableWriterFn create(BigtableConfig bigtableConfig) {
+      if (bigtableConfig.getProjectId() == null || bigtableConfig.getInstanceId() == null) {
+        return new BigtableWriterFn(
+            bigtableConfig,
+            new ResourceStatsSupplierImpl(
+                bigtableConfig.getBigtableOptions().getProjectId(),
+                bigtableConfig.getBigtableOptions().getInstanceId()));

Review Comment:
   I think this breaks templates.
   
   There are 2 phases in a dataflow job: graph construction and execution. During construction, all of the DoFns are instantiated, but none of the ValueProviders are populated. Then during execution all of the ValueProviders are populated and DoFns are executed.
   
   Here you are trying to resolve ValueProviders during construction time, which will break the template mechanism.
   you can do one of 2 things:
   
   1. pass value provider for the instance name (the valueprovider will take BigtableConfig as the input)
   2. Pass the BigtableConfig and resolve the instance name later



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1935,3 +1965,21 @@ abstract static class Builder {
     }
   }
 }
+
+class FakeResourceStatsSupplier implements BigtableIO.ResourceStatsSupplier, Serializable {

Review Comment:
   please keep these as private static classes inside the test case class



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1935,3 +1965,21 @@ abstract static class Builder {
     }
   }
 }
+
+class FakeResourceStatsSupplier implements BigtableIO.ResourceStatsSupplier, Serializable {

Review Comment:
   Also to make it easier to track down where magic numbers are coming from I would add a ctor that takes the fake cumulativeThrottlingTimeNanos and passes it down into FakeResourceStats. This way at the call site you can see where the numbers are coming from



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -813,12 +839,34 @@ public String toString() {
     }
   }
 
-  private static class BigtableWriterFn
+  @VisibleForTesting
+  static class BigtableWriterFn
       extends DoFn<KV<ByteString, Iterable<Mutation>>, BigtableWriteResult> {
 
-    BigtableWriterFn(BigtableConfig bigtableConfig) {
+    protected final Counter cumulativeThrottlingMilliseconds =
+        Metrics.counter(BigtableWriterFn.class, "throttling-msecs");
+    private static Object metricLock = new Object();
+    private static long lastAggregatedThrottleTime = 0;
+    private ResourceStatsSupplier statsSupplier;

Review Comment:
   final?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1431,4 +1492,24 @@ static void validateTableExists(BigtableConfig config, PipelineOptions options)
       }
     }
   }
+
+  interface ResourceStatsSupplier {
+    ResourceLimiterStats getStats();
+  }
+
+  static class ResourceStatsSupplierImpl implements ResourceStatsSupplier, Serializable {

Review Comment:
   should this be private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org