You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/09/14 20:32:13 UTC

[beam] branch master updated: [BEAM-10505][BEAM-10530] Add truncate capability.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb063d  [BEAM-10505][BEAM-10530] Add truncate capability.
     new 2ba24ed  Merge pull request #12773 from boyuanzz/enable_drain
6fb063d is described below

commit 6fb063d72131c5b891de580248514f461b457b44
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Sep 3 15:54:14 2020 -0700

    [BEAM-10505][BEAM-10530] Add truncate capability.
---
 .../apache/beam/runners/core/construction/Environments.java |  4 ++--
 .../beam/runners/core/construction/EnvironmentsTest.java    | 13 ++++++-------
 sdks/python/apache_beam/transforms/environments.py          |  3 +--
 sdks/python/apache_beam/transforms/environments_test.py     |  7 +++----
 4 files changed, 12 insertions(+), 15 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 1583d13..0033584 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -39,6 +39,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardArtifacts;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardProtocols;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
@@ -314,8 +315,7 @@ public class Environments {
     capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING));
     capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING));
     capabilities.add("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL);
-    // TODO(BEAM-10505): Add the capability back.
-    // capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
+    capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
     return capabilities.build();
   }
 
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
index a562e33..7d8476d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
@@ -104,13 +104,12 @@ public class EnvironmentsTest implements Serializable {
     assertThat(
         Environments.getJavaCapabilities(),
         hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING)));
-    // TODO(BEAM-10505): Add the check back.
-    // assertThat(
-    //     Environments.getJavaCapabilities(),
-    //     hasItem(
-    //         BeamUrns.getUrn(
-    //             RunnerApi.StandardPTransforms.SplittableParDoComponents
-    //                 .TRUNCATE_SIZED_RESTRICTION)));
+    assertThat(
+        Environments.getJavaCapabilities(),
+        hasItem(
+            BeamUrns.getUrn(
+                RunnerApi.StandardPTransforms.SplittableParDoComponents
+                    .TRUNCATE_SIZED_RESTRICTION)));
   }
 
   @Test
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index 2befe70..a91159c 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -596,8 +596,7 @@ def _python_sdk_capabilities_iter():
   yield common_urns.protocols.WORKER_STATUS.urn
   yield python_urns.PACKED_COMBINE_FN
   yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
-  #TODO(BEAM-10530): Add truncate capability.
-  # yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
+  yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
 
 
 def python_sdk_dependencies(options, tmp_dir=None):
diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py
index 46be840..a313dad 100644
--- a/sdks/python/apache_beam/transforms/environments_test.py
+++ b/sdks/python/apache_beam/transforms/environments_test.py
@@ -73,10 +73,9 @@ class RunnerApiTest(unittest.TestCase):
     sdk_capabilities = environments.python_sdk_capabilities()
     self.assertIn(common_urns.coders.LENGTH_PREFIX.urn, sdk_capabilities)
     self.assertIn(common_urns.protocols.WORKER_STATUS.urn, sdk_capabilities)
-    #TODO(BEAM-10530): Add truncate capability.
-    # self.assertIn(
-    #     common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
-    #     sdk_capabilities)
+    self.assertIn(
+        common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+        sdk_capabilities)
 
   def test_default_capabilities(self):
     environment = DockerEnvironment.from_options(