You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/01/06 17:50:55 UTC
[beam] branch master updated: [BEAM-9055] Unify the config names of
Fn Data API across languages.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aaaf6ea [BEAM-9055] Unify the config names of Fn Data API across languages.
new ce0ff5e Merge pull request #10505 from sunjincheng121/BEAM-9055-PR
aaaf6ea is described below
commit aaaf6ea2eca84c945de416979da8d7b328b22bd5
Author: sunjincheng121 <su...@gmail.com>
AuthorDate: Mon Jan 6 14:38:47 2020 +0800
[BEAM-9055] Unify the config names of Fn Data API across languages.
---
.../data/BeamFnDataBufferingOutboundObserver.java | 22 +++++++++++++++++-----
...DataSizeBasedBufferingOutboundObserverTest.java | 2 +-
...DataTimeBasedBufferingOutboundObserverTest.java | 4 ++--
.../fn/harness/data/BeamFnDataGrpcClientTest.java | 2 +-
4 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index 72ab5d6..bbc2916 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -33,20 +33,26 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
* {@link BeamFnApi.Elements} message when the buffer threshold is surpassed.
*
* <p>The default size-based buffer threshold can be overridden by specifying the experiment {@code
- * beam_fn_api_data_buffer_size_limit=<bytes>}
+ * data_buffer_size_limit=<bytes>}
*
* <p>The default time-based buffer threshold can be overridden by specifying the experiment {@code
- * beam_fn_api_data_buffer_time_limit=<milliseconds>}
+ * data_buffer_time_limit_ms=<milliseconds>}
*/
public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataReceiver<T> {
// TODO: Consider moving this constant out of this interface
- /** @deprecated Use BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT instead. */
+ /** @deprecated Use DATA_BUFFER_SIZE_LIMIT instead. */
@Deprecated String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
- String BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT = "beam_fn_api_data_buffer_size_limit=";
+ /** @deprecated Use DATA_BUFFER_SIZE_LIMIT instead. */
+ @Deprecated String BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT = "beam_fn_api_data_buffer_size_limit=";
+
+ String DATA_BUFFER_SIZE_LIMIT = "data_buffer_size_limit=";
@VisibleForTesting int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
- String BEAM_FN_API_DATA_BUFFER_TIME_LIMIT = "beam_fn_api_data_buffer_time_limit=";
+ /** @deprecated Use DATA_BUFFER_TIME_LIMIT_MS instead. */
+ @Deprecated String BEAM_FN_API_DATA_BUFFER_TIME_LIMIT = "beam_fn_api_data_buffer_time_limit=";
+
+ String DATA_BUFFER_TIME_LIMIT_MS = "data_buffer_time_limit_ms=";
long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
static <T> BeamFnDataSizeBasedBufferingOutboundObserver<T> forLocation(
@@ -68,6 +74,9 @@ public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataR
static int getSizeLimit(PipelineOptions options) {
List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+ if (experiment.startsWith(DATA_BUFFER_SIZE_LIMIT)) {
+ return Integer.parseInt(experiment.substring(DATA_BUFFER_SIZE_LIMIT.length()));
+ }
if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT)) {
return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT.length()));
}
@@ -81,6 +90,9 @@ public interface BeamFnDataBufferingOutboundObserver<T> extends CloseableFnDataR
static long getTimeLimit(PipelineOptions options) {
List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+ if (experiment.startsWith(DATA_BUFFER_TIME_LIMIT_MS)) {
+ return Long.parseLong(experiment.substring(DATA_BUFFER_TIME_LIMIT_MS.length()));
+ }
if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT)) {
return Long.parseLong(experiment.substring(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT.length()));
}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
index 0e53b26..ed2f700 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserverTest.java
@@ -130,7 +130,7 @@ public class BeamFnDataSizeBasedBufferingOutboundObserverTest {
PipelineOptions options = PipelineOptionsFactory.create();
options
.as(ExperimentalOptions.class)
- .setExperiments(Arrays.asList("beam_fn_api_data_buffer_size_limit=100"));
+ .setExperiments(Arrays.asList("data_buffer_size_limit=100"));
CloseableFnDataReceiver<WindowedValue<byte[]>> consumer =
BeamFnDataBufferingOutboundObserver.forLocation(
options,
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
index f4effa8..eaf6290 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.java
@@ -54,7 +54,7 @@ public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
PipelineOptions options = PipelineOptionsFactory.create();
options
.as(ExperimentalOptions.class)
- .setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
+ .setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
final CountDownLatch waitForFlush = new CountDownLatch(1);
CloseableFnDataReceiver<WindowedValue<byte[]>> consumer =
BeamFnDataBufferingOutboundObserver.forLocation(
@@ -80,7 +80,7 @@ public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
PipelineOptions options = PipelineOptionsFactory.create();
options
.as(ExperimentalOptions.class)
- .setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
+ .setExperiments(Arrays.asList("data_buffer_time_limit_ms=1"));
BeamFnDataTimeBasedBufferingOutboundObserver<WindowedValue<byte[]>> consumer =
(BeamFnDataTimeBasedBufferingOutboundObserver<WindowedValue<byte[]>>)
BeamFnDataBufferingOutboundObserver.forLocation(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 672d41b..deb6218 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -292,7 +292,7 @@ public class BeamFnDataGrpcClientTest {
BeamFnDataGrpcClient clientFactory =
new BeamFnDataGrpcClient(
PipelineOptionsFactory.fromArgs(
- new String[] {"--experiments=beam_fn_api_data_buffer_size_limit=20"})
+ new String[] {"--experiments=data_buffer_size_limit=20"})
.create(),
(Endpoints.ApiServiceDescriptor descriptor) -> channel,
OutboundObserverFactory.trivial());