You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/01/06 17:50:55 UTC

[beam] branch master updated: [BEAM-9055] Unify the config names of Fn Data API across languages.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aaaf6ea  [BEAM-9055] Unify the config names of Fn Data API across languages.
     new ce0ff5e  Merge pull request #10505 from sunjincheng121/BEAM-9055-PR
aaaf6ea is described below

commit aaaf6ea2eca84c945de416979da8d7b328b22bd5
Author: sunjincheng121 <su...@gmail.com>
AuthorDate: Mon Jan 6 14:38:47 2020 +0800

    [BEAM-9055] Unify the config names of Fn Data API across languages.
---
 .../data/BeamFnDataBufferingOutboundObserver.java  | 22 +++++++++++++++++-----
 ...DataSizeBasedBufferingOutboundObserverTest.java |  2 +-
 ...DataTimeBasedBufferingOutboundObserverTest.java |  4 ++--
 .../fn/harness/data/BeamFnDataGrpcClientTest.java  |  2 +-
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index 72ab5d6..bbc2916 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -33,20 +33,26 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
  * {@link BeamFnApi.Elements} message when the buffer threshold is surpassed.
  *
  * <p>The default size-based buffer threshold can be overridden by specifying the experiment {@code
- * beam_fn_api_data_buffer_size_limit=<bytes>}
+ * data_buffer_size_limit=<bytes>}
  *
  * <p>The default time-based buffer threshold can be overridden by specifying the experiment {@code
- * beam_fn_api_data_buffer_time_limit=<milliseconds>}
+ * data_buffer_time_limit_ms=<milliseconds>}
  */
 public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataReceiver<T> {
   // TODO: Consider moving this constant out of this interface
-  /** @deprecated Use BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT instead. */
+  /** @deprecated Use DATA_BUFFER_SIZE_LIMIT instead. */
   @Deprecated String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
 
-  String BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT = "beam_fn_api_data_buffer_size_limit=";
+  /** @deprecated Use DATA_BUFFER_SIZE_LIMIT instead. */
+  @Deprecated String BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT = "beam_fn_api_data_buffer_size_limit=";
+
+  String DATA_BUFFER_SIZE_LIMIT = "data_buffer_size_limit=";
   @VisibleForTesting int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
 
-  String BEAM_FN_API_DATA_BUFFER_TIME_LIMIT = "beam_fn_api_data_buffer_time_limit=";
+  /** @deprecated Use DATA_BUFFER_TIME_LIMIT_MS instead. */
+  @Deprecated String BEAM_FN_API_DATA_BUFFER_TIME_LIMIT = "beam_fn_api_data_buffer_time_limit=";
+
+  String DATA_BUFFER_TIME_LIMIT_MS = "data_buffer_time_limit_ms=";
   long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
 
   static <T> BeamFnDataSizeBasedBufferingOutboundObserver<T> forLocation(
@@ -68,6 +74,9 @@ public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataR
   static int getSizeLimit(PipelineOptions options) {
     List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
     for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+      if (experiment.startsWith(DATA_BUFFER_SIZE_LIMIT)) {
+        return Integer.parseInt(experiment.substring(DATA_BUFFER_SIZE_LIMIT.length()));
+      }
       if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT)) {
         return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT.length()));
       }
@@ -81,6 +90,9 @@ public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataR
   static long getTimeLimit(PipelineOptions options) {
     List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
     for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+      if (experiment.startsWith(DATA_BUFFER_TIME_LIMIT_MS)) {
+        return Long.parseLong(experiment.substring(DATA_BUFFER_TIME_LIMIT_MS.length()));
+      }
       if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT)) {
         return Long.parseLong(experiment.substring(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT.length()));
       }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
index 0e53b26..ed2f700 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
@@ -130,7 +130,7 @@ public class BeamFnDataSizeBasedBufferingOutboundObserverTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     options
         .as(ExperimentalOptions.class)
-        .setExperiments(Arrays.asList("beam_fn_api_data_buffer_size_limit=100"));
+        .setExperiments(Arrays.asList("data_buffer_size_limit=100"));
     CloseableFnDataReceiver<WindowedValue<byte[]>> consumer =
         BeamFnDataBufferingOutboundObserver.forLocation(
             options,
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
index f4effa8..eaf6290 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
@@ -54,7 +54,7 @@ public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     options
         .as(ExperimentalOptions.class)
-        .setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
+        .setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
     final CountDownLatch waitForFlush = new CountDownLatch(1);
     CloseableFnDataReceiver<WindowedValue<byte[]>> consumer =
         BeamFnDataBufferingOutboundObserver.forLocation(
@@ -80,7 +80,7 @@ public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     options
         .as(ExperimentalOptions.class)
-        .setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
+        .setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
     BeamFnDataTimeBasedBufferingOutboundObserver<WindowedValue<byte[]>> consumer =
         (BeamFnDataTimeBasedBufferingOutboundObserver<WindowedValue<byte[]>>)
             BeamFnDataBufferingOutboundObserver.forLocation(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 672d41b..deb6218 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -292,7 +292,7 @@ public class BeamFnDataGrpcClientTest {
       BeamFnDataGrpcClient clientFactory =
           new BeamFnDataGrpcClient(
               PipelineOptionsFactory.fromArgs(
-                      new String[] {"--experiments=beam_fn_api_data_buffer_size_limit=20"})
+                      new String[] {"--experiments=data_buffer_size_limit=20"})
                   .create(),
               (Endpoints.ApiServiceDescriptor descriptor) -> channel,
               OutboundObserverFactory.trivial());