You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/03/05 18:36:29 UTC

[beam] branch master updated: Remove experiement --enable_streaming_auto_sharding

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c47ce74  Remove experiement --enable_streaming_auto_sharding
     new 4390b70  Merge pull request #14133 from [BEAM-11772, BEAM-11408] Remove Dataflow experiment --enable_streaming_auto_sharding
c47ce74 is described below

commit c47ce74c4b831c4209928278424d2c36bae2dad3
Author: sychen <sy...@google.com>
AuthorDate: Tue Mar 2 14:56:23 2021 -0800

    Remove experiement --enable_streaming_auto_sharding
---
 .../beam/runners/dataflow/DataflowRunner.java      |  7 +---
 .../dataflow/DataflowPipelineTranslatorTest.java   | 46 ++++++----------------
 .../beam/runners/dataflow/DataflowRunnerTest.java  |  1 -
 .../runners/dataflow/dataflow_runner_test.py       | 43 ++++++++++----------
 .../runners/dataflow/ptransform_overrides.py       | 15 ++++---
 5 files changed, 46 insertions(+), 66 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 946f020..cc0708f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1393,11 +1393,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     checkArgument(
         options.isEnableStreamingEngine(),
         "Runner determined sharding not available in Dataflow for GroupIntoBatches for"
-            + " non-Streaming-Engine jobs.");
-    checkArgument(
-        hasExperiment(options, "enable_streaming_auto_sharding"),
-        "Runner determined sharding not enabled in Dataflow for GroupIntoBatches."
-            + " Try adding the experiment: --experiments=enable_streaming_auto_sharding.");
+            + " non-Streaming-Engine jobs. In order to use runner determined sharding, please use"
+            + " --streaming --enable_streaming_engine");
     pcollectionsRequiringAutoSharding.add(pcol);
   }
 
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c9c08c3..d0781e4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -1150,7 +1150,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
-  private JobSpecification runGroupIntoBatchesAndGetJobSpec(
+  private JobSpecification runStreamingGroupIntoBatchesAndGetJobSpec(
       Boolean withShardedKey, List<String> experiments) throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setExperiments(experiments);
@@ -1179,10 +1179,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     List<String> experiments =
         new ArrayList<>(
             ImmutableList.of(
-                "enable_streaming_auto_sharding",
-                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
-                GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
-    JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(false, experiments);
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
+    JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(false, experiments);
     List<Step> steps = jobSpec.getJob().getSteps();
     Step shardedStateStep = steps.get(steps.size() - 1);
     Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1197,10 +1195,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     List<String> experiments =
         new ArrayList<>(
             ImmutableList.of(
-                "enable_streaming_auto_sharding",
-                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
-                GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
-    JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(true, experiments);
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT));
+    JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
     List<Step> steps = jobSpec.getJob().getSteps();
     Step shardedStateStep = steps.get(steps.size() - 1);
     Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1217,11 +1213,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     List<String> experiments =
         new ArrayList<>(
             ImmutableList.of(
-                "enable_streaming_auto_sharding",
                 GcpOptions.STREAMING_ENGINE_EXPERIMENT,
                 GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
                 "use_runner_v2"));
-    JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(false, experiments);
+    JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(false, experiments);
     List<Step> steps = jobSpec.getJob().getSteps();
     Step shardedStateStep = steps.get(steps.size() - 1);
     Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1247,11 +1242,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     List<String> experiments =
         new ArrayList<>(
             ImmutableList.of(
-                "enable_streaming_auto_sharding",
                 GcpOptions.STREAMING_ENGINE_EXPERIMENT,
                 GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
                 "use_runner_v2"));
-    JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(true, experiments);
+    JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
     List<Step> steps = jobSpec.getJob().getSteps();
     Step shardedStateStep = steps.get(steps.size() - 1);
     Map<String, Object> properties = shardedStateStep.getProperties();
@@ -1291,28 +1285,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
   @Test
   public void testGroupIntoBatchesWithShardedKeyNotSupported() throws IOException {
-    List<String> experiments1 =
-        new ArrayList<>(
-            ImmutableList.of(
-                "enable_streaming_auto_sharding",
-                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
-                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
-                "beam_fn_api"));
-    thrown.expect(IllegalArgumentException.class);
-    runGroupIntoBatchesAndGetJobSpec(true, experiments1);
-
-    List<String> experiments2 = new ArrayList<>(ImmutableList.of("enable_streaming_auto_sharding"));
-    thrown.expect(IllegalArgumentException.class);
-    runGroupIntoBatchesAndGetJobSpec(true, experiments2);
-
-    List<String> experiments3 =
-        new ArrayList<>(
-            ImmutableList.of(
-                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
-                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
-                "use_runner_v2"));
+    // Not using streaming engine.
+    List<String> experiments = new ArrayList<>(ImmutableList.of("use_runner_v2"));
     thrown.expect(IllegalArgumentException.class);
-    runGroupIntoBatchesAndGetJobSpec(true, experiments3);
+    thrown.expectMessage(
+        "Runner determined sharding not available in Dataflow for GroupIntoBatches for non-Streaming-Engine jobs");
+    runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments);
   }
 
   @Test
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9eb318a..d83c20e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1769,7 +1769,6 @@ public class DataflowRunnerTest implements Serializable {
     List<String> experiments =
         new ArrayList<>(
             ImmutableList.of(
-                "enable_streaming_auto_sharding",
                 GcpOptions.STREAMING_ENGINE_EXPERIMENT,
                 GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
                 "use_runner_v2"));
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 01d1919..85b481f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -792,8 +792,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   def _run_group_into_batches_and_get_step_properties(
       self, with_sharded_key, additional_properties):
     self.default_properties.append('--streaming')
-    self.default_properties.append(
-        '--experiment=enable_streaming_auto_sharding')
     for property in additional_properties:
       self.default_properties.append(property)
 
@@ -816,39 +814,42 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
 
   def test_group_into_batches_translation(self):
     properties = self._run_group_into_batches_and_get_step_properties(
-        True, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+        True, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
     self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
     self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], u'true')
     self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], u'true')
 
-  def test_group_into_batches_translation_non_se(self):
-    properties = self._run_group_into_batches_and_get_step_properties(
-        True, ['--experiment=use_runner_v2'])
-    self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
-    self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
-    self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
-
   def test_group_into_batches_translation_non_sharded(self):
     properties = self._run_group_into_batches_and_get_step_properties(
-        False, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+        False, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
     self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
     self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
     self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
 
+  def test_group_into_batches_translation_non_se(self):
+    with self.assertRaisesRegex(
+        ValueError,
+        'Runner determined sharding not available in Dataflow for '
+        'GroupIntoBatches for non-Streaming-Engine jobs'):
+      _ = self._run_group_into_batches_and_get_step_properties(
+          True, ['--experiments=use_runner_v2'])
+
   def test_group_into_batches_translation_non_unified_worker(self):
     # non-portable
-    properties = self._run_group_into_batches_and_get_step_properties(
-        True, ['--enable_streaming_engine'])
-    self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
-    self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
-    self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
+    with self.assertRaisesRegex(
+        ValueError,
+        'Runner determined sharding not available in Dataflow for '
+        'GroupIntoBatches for jobs not using Runner V2'):
+      _ = self._run_group_into_batches_and_get_step_properties(
+          True, ['--enable_streaming_engine'])
 
     # JRH
-    properties = self._run_group_into_batches_and_get_step_properties(
-        True, ['--enable_streaming_engine', '--experiment=beam_fn_api'])
-    self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
-    self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
-    self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
+    with self.assertRaisesRegex(
+        ValueError,
+        'Runner determined sharding not available in Dataflow for '
+        'GroupIntoBatches for jobs not using Runner V2'):
+      _ = self._run_group_into_batches_and_get_step_properties(
+          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
 
   def _test_pack_combiners(self, pipeline_options, expect_packed):
     runner = DataflowRunner()
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 402a4ed..8b31629 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -360,14 +360,19 @@ class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
       return False
     google_cloud_options = self.options.view_as(GoogleCloudOptions)
     if not google_cloud_options.enable_streaming_engine:
-      return False
+      raise ValueError(
+          'Runner determined sharding not available in Dataflow for '
+          'GroupIntoBatches for non-Streaming-Engine jobs. In order to use '
+          'runner determined sharding, please use '
+          '--streaming --enable_streaming_engine --experiments=use_runner_v2')
 
     from apache_beam.runners.dataflow.internal import apiclient
     if not apiclient._use_unified_worker(self.options):
-      return False
-    experiments = self.options.view_as(DebugOptions).experiments or []
-    if 'enable_streaming_auto_sharding' not in experiments:
-      return False
+      raise ValueError(
+          'Runner determined sharding not available in Dataflow for '
+          'GroupIntoBatches for jobs not using Runner V2. In order to use '
+          'runner determined sharding, please use '
+          '--streaming --enable_streaming_engine --experiments=use_runner_v2')
 
     self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
     return True