You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/03/05 18:36:29 UTC
[beam] branch master updated: Remove experiement
--enable_streaming_auto_sharding
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c47ce74 Remove experiement --enable_streaming_auto_sharding
new 4390b70 Merge pull request #14133 from [BEAM-11772, BEAM-11408] Remove Dataflow experiment --enable_streaming_auto_sharding
c47ce74 is described below
commit c47ce74c4b831c4209928278424d2c36bae2dad3
Author: sychen <sy...@google.com>
AuthorDate: Tue Mar 2 14:56:23 2021 -0800
Remove experiement --enable_streaming_auto_sharding
---
.../beam/runners/dataflow/DataflowRunner.java | 7 +---
.../dataflow/DataflowPipelineTranslatorTest.java | 46 ++++++----------------
.../beam/runners/dataflow/DataflowRunnerTest.java | 1 -
.../runners/dataflow/dataflow_runner_test.py | 43 ++++++++++----------
.../runners/dataflow/ptransform_overrides.py | 15 ++++---
5 files changed, 46 insertions(+), 66 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 946f020..cc0708f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1393,11 +1393,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
checkArgument(
options.isEnableStreamingEngine(),
"Runner determined sharding not available in Dataflow for GroupIntoBatches for"
- + " non-Streaming-Engine jobs.");
- checkArgument(
- hasExperiment(options, "enable_streaming_auto_sharding"),
- "Runner determined sharding not enabled in Dataflow for GroupIntoBatches."
- + " Try adding the experiment: --experiments=enable_streaming_auto_sharding.");
+ + " non-Streaming-Engine jobs. In order to use runner determined sharding, please use"
+ + " --streaming --enable_streaming_engine");
pcollectionsRequiringAutoSharding.add(pcol);
}
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c9c08c3..d0781e4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -1150,7 +1150,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
- private JobSpecification runGroupIntoBatchesAndGetJobSpec(
+ private JobSpecification runStreamingGroupIntoBatchesAndGetJobSpec(
Boolean withShardedKey, List<String> experiments) throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(experiments);
@@ -1179,10 +1179,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
- "enable_streaming_auto_sharding",
- GcpOptions.STREAMING_ENGINE_EXPERIMENT,
- GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
- JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(false, experiments);
+ GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
+ JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(false, experiments);
List<Step> steps = jobSpec.getJob().getSteps();
Step shardedStateStep = steps.get(steps.size() - 1);
Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1197,10 +1195,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
- "enable_streaming_auto_sharding",
- GcpOptions.STREAMING_ENGINE_EXPERIMENT,
- GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
- JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(true, experiments);
+ GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
+ JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
List<Step> steps = jobSpec.getJob().getSteps();
Step shardedStateStep = steps.get(steps.size() - 1);
Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1217,11 +1213,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
- "enable_streaming_auto_sharding",
GcpOptions.STREAMING_ENGINE_EXPERIMENT,
GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
"use_runner_v2"));
- JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(false, experiments);
+ JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(false, experiments);
List<Step> steps = jobSpec.getJob().getSteps();
Step shardedStateStep = steps.get(steps.size() - 1);
Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1247,11 +1242,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
- "enable_streaming_auto_sharding",
GcpOptions.STREAMING_ENGINE_EXPERIMENT,
GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
"use_runner_v2"));
- JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(true, experiments);
+ JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
List<Step> steps = jobSpec.getJob().getSteps();
Step shardedStateStep = steps.get(steps.size() - 1);
Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1291,28 +1285,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testGroupIntoBatchesWithShardedKeyNotSupported() throws IOException {
- List<String> experiments1 =
- new ArrayList<>(
- ImmutableList.of(
- "enable_streaming_auto_sharding",
- GcpOptions.STREAMING_ENGINE_EXPERIMENT,
- GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
- "beam_fn_api"));
- thrown.expect(IllegalArgumentException.class);
- runGroupIntoBatchesAndGetJobSpec(true, experiments1);
-
- List<String> experiments2 = new ArrayList<>(ImmutableList.of("enable_streaming_auto_sharding"));
- thrown.expect(IllegalArgumentException.class);
- runGroupIntoBatchesAndGetJobSpec(true, experiments2);
-
- List<String> experiments3 =
- new ArrayList<>(
- ImmutableList.of(
- GcpOptions.STREAMING_ENGINE_EXPERIMENT,
- GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
- "use_runner_v2"));
+ // Not using streaming engine.
+ List<String> experiments = new ArrayList<>(ImmutableList.of("use_runner_v2"));
thrown.expect(IllegalArgumentException.class);
- runGroupIntoBatchesAndGetJobSpec(true, experiments3);
+ thrown.expectMessage(
+ "Runner determined sharding not available in Dataflow for GroupIntoBatches for non-Streaming-Engine jobs");
+ runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
}
@Test
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9eb318a..d83c20e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1769,7 +1769,6 @@ public class DataflowRunnerTest implements Serializable {
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
- "enable_streaming_auto_sharding",
GcpOptions.STREAMING_ENGINE_EXPERIMENT,
GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
"use_runner_v2"));
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 01d1919..85b481f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -792,8 +792,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
def _run_group_into_batches_and_get_step_properties(
self, with_sharded_key, additional_properties):
self.default_properties.append('--streaming')
- self.default_properties.append(
- '--experiment=enable_streaming_auto_sharding')
for property in additional_properties:
self.default_properties.append(property)
@@ -816,39 +814,42 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
def test_group_into_batches_translation(self):
properties = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+ True, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], u'true')
self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], u'true')
- def test_group_into_batches_translation_non_se(self):
- properties = self._run_group_into_batches_and_get_step_properties(
- True, ['--experiment=use_runner_v2'])
- self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
- self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
- self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
-
def test_group_into_batches_translation_non_sharded(self):
properties = self._run_group_into_batches_and_get_step_properties(
- False, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+ False, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
+ def test_group_into_batches_translation_non_se(self):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'Runner determined sharding not available in Dataflow for '
+ 'GroupIntoBatches for non-Streaming-Engine jobs'):
+ _ = self._run_group_into_batches_and_get_step_properties(
+ True, ['--experiments=use_runner_v2'])
+
def test_group_into_batches_translation_non_unified_worker(self):
# non-portable
- properties = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine'])
- self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
- self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
- self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'Runner determined sharding not available in Dataflow for '
+ 'GroupIntoBatches for jobs not using Runner V2'):
+ _ = self._run_group_into_batches_and_get_step_properties(
+ True, ['--enable_streaming_engine'])
# JRH
- properties = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine', '--experiment=beam_fn_api'])
- self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
- self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
- self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'Runner determined sharding not available in Dataflow for '
+ 'GroupIntoBatches for jobs not using Runner V2'):
+ _ = self._run_group_into_batches_and_get_step_properties(
+ True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
def _test_pack_combiners(self, pipeline_options, expect_packed):
runner = DataflowRunner()
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 402a4ed..8b31629 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -360,14 +360,19 @@ class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
return False
google_cloud_options = self.options.view_as(GoogleCloudOptions)
if not google_cloud_options.enable_streaming_engine:
- return False
+ raise ValueError(
+ 'Runner determined sharding not available in Dataflow for '
+ 'GroupIntoBatches for non-Streaming-Engine jobs. In order to use '
+ 'runner determined sharding, please use '
+ '--streaming --enable_streaming_engine --experiments=use_runner_v2')
from apache_beam.runners.dataflow.internal import apiclient
if not apiclient._use_unified_worker(self.options):
- return False
- experiments = self.options.view_as(DebugOptions).experiments or []
- if 'enable_streaming_auto_sharding' not in experiments:
- return False
+ raise ValueError(
+ 'Runner determined sharding not available in Dataflow for '
+ 'GroupIntoBatches for jobs not using Runner V2. In order to use '
+ 'runner determined sharding, please use '
+ '--streaming --enable_streaming_engine --experiments=use_runner_v2')
self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
return True