You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/01/07 17:38:24 UTC

[beam] branch master updated: [BEAM-9059] Migrate PTransformTranslation to use string constants

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d0eb6dd  [BEAM-9059] Migrate PTransformTranslation to use string constants
     new 7e8987a  Merge pull request #10511 from lukecwik/splittabledofn2
d0eb6dd is described below

commit d0eb6dd0cf8305c9b5c0bd53a31783337d9ed939
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Jan 6 16:12:57 2020 -0800

    [BEAM-9059] Migrate PTransformTranslation to use string constants
---
 .../core/construction/PTransformTranslation.java   | 146 +++++++++++++++------
 1 file changed, 105 insertions(+), 41 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index c21cd38..95f50ea 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -60,68 +61,131 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
  * buffers}.
  */
 public class PTransformTranslation {
-
-  public static final String PAR_DO_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.PAR_DO);
-  public static final String FLATTEN_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.FLATTEN);
-  public static final String GROUP_BY_KEY_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY);
-  public static final String IMPULSE_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.IMPULSE);
-  public static final String ASSIGN_WINDOWS_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Primitives.ASSIGN_WINDOWS);
-  public static final String TEST_STREAM_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Primitives.TEST_STREAM);
-  public static final String MAP_WINDOWS_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
-
+  // We specifically copy the values here so that they can be used in switch case statements
+  // and we validate that the value matches the actual URN in the static block below.
+
+  // Primitives
+  public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1";
+  public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1";
+  public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1";
+  public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1";
+  public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1";
+  public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1";
+  public static final String MAP_WINDOWS_TRANSFORM_URN = "beam:transform:map_windows:v1";
+
+  // DeprecatedPrimitives
   /**
    * @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
    *     + SplittableDoFns.
    */
-  @Deprecated
-  public static final String READ_TRANSFORM_URN =
-      getUrn(StandardPTransforms.DeprecatedPrimitives.READ);
+  @Deprecated public static final String READ_TRANSFORM_URN = "beam:transform:read:v1";
+
   /**
    * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
    *     part of the translation for a `ParDo` side input.
    */
   @Deprecated
-  public static final String CREATE_VIEW_TRANSFORM_URN =
-      getUrn(StandardPTransforms.DeprecatedPrimitives.CREATE_VIEW);
+  public static final String CREATE_VIEW_TRANSFORM_URN = "beam:transform:create_view:v1";
 
-  public static final String COMBINE_PER_KEY_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
-  public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
-  public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
-      getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
+  // Composites
+  public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
+  public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
+  public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:v1";
+
+  // CombineComponents
   public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
-      getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
+      "beam:transform:combine_per_key_precombine:v1";
   public static final String COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN =
-      getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
+      "beam:transform:combine_per_key_merge_accumulators:v1";
   public static final String COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN =
-      getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
-  public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE);
-  public static final String WRITE_FILES_TRANSFORM_URN =
-      getUrn(StandardPTransforms.Composites.WRITE_FILES);
+      "beam:transform:combine_per_key_extract_outputs:v1";
+  public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
+      "beam:transform:combine_grouped_values:v1";
 
   // SplittableParDoComponents
   public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
-      getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION);
+      "beam:transform:sdf_pair_with_restriction:v1";
   public static final String SPLITTABLE_SPLIT_RESTRICTION_URN =
-      getUrn(SplittableParDoComponents.SPLIT_RESTRICTION);
+      "beam:transform:sdf_split_restriction:v1";
+  /**
+   * @deprecated runners should move away from using `SplittableProcessKeyedElements` and prefer to
+   *     internalize any necessary SplittableDoFn expansion.
+   */
+  @Deprecated
   public static final String SPLITTABLE_PROCESS_KEYED_URN =
-      getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS);
+      "beam:transform:sdf_process_keyed_elements:v1";
+
   public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
-      getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
+      "beam:transform:sdf_process_elements:v1";
   public static final String SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN =
-      getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS);
+      "beam:transform:sdf_split_and_size_restrictions:v1";
   public static final String SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN =
-      getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS);
-
-  public static final String ITERABLE_SIDE_INPUT =
-      getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
-  public static final String MULTIMAP_SIDE_INPUT =
-      getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
+      "beam:transform:sdf_process_sized_element_and_restrictions:v1";
+
+  static {
+    checkState(PAR_DO_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.PAR_DO)));
+    checkState(FLATTEN_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.FLATTEN)));
+    checkState(
+        GROUP_BY_KEY_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY)));
+    checkState(IMPULSE_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.IMPULSE)));
+    checkState(
+        ASSIGN_WINDOWS_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.ASSIGN_WINDOWS)));
+    checkState(
+        TEST_STREAM_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.TEST_STREAM)));
+    checkState(
+        MAP_WINDOWS_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.MAP_WINDOWS)));
+
+    // DeprecatedPrimitives
+    checkState(READ_TRANSFORM_URN.equals(getUrn(StandardPTransforms.DeprecatedPrimitives.READ)));
+    checkState(
+        CREATE_VIEW_TRANSFORM_URN.equals(
+            getUrn(StandardPTransforms.DeprecatedPrimitives.CREATE_VIEW)));
+
+    // Composites
+    checkState(
+        COMBINE_PER_KEY_TRANSFORM_URN.equals(
+            getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY)));
+    checkState(
+        COMBINE_GLOBALLY_TRANSFORM_URN.equals(
+            getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY)));
+    checkState(RESHUFFLE_URN.equals(getUrn(StandardPTransforms.Composites.RESHUFFLE)));
+    checkState(
+        WRITE_FILES_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Composites.WRITE_FILES)));
+
+    // CombineComponents
+    checkState(
+        COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN.equals(
+            getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE)));
+    checkState(
+        COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN.equals(
+            getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS)));
+    checkState(
+        COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN.equals(
+            getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS)));
+    checkState(
+        COMBINE_GROUPED_VALUES_TRANSFORM_URN.equals(
+            getUrn(CombineComponents.COMBINE_GROUPED_VALUES)));
+
+    // SplittableParDoComponents
+    checkState(
+        SPLITTABLE_PAIR_WITH_RESTRICTION_URN.equals(
+            getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION)));
+    checkState(
+        SPLITTABLE_SPLIT_RESTRICTION_URN.equals(
+            getUrn(SplittableParDoComponents.SPLIT_RESTRICTION)));
+    checkState(
+        SPLITTABLE_PROCESS_KEYED_URN.equals(
+            getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS)));
+    checkState(
+        SPLITTABLE_PROCESS_ELEMENTS_URN.equals(getUrn(SplittableParDoComponents.PROCESS_ELEMENTS)));
+    checkState(
+        SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN.equals(
+            getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS)));
+    checkState(
+        SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.equals(
+            getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS)));
+  }
 
   private static final Collection<TransformTranslator<?>> KNOWN_TRANSLATORS =
       loadKnownTranslators();