You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/01/07 17:38:24 UTC
[beam] branch master updated: [BEAM-9059] Migrate
PTransformTranslation to use string constants
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d0eb6dd [BEAM-9059] Migrate PTransformTranslation to use string constants
new 7e8987a Merge pull request #10511 from lukecwik/splittabledofn2
d0eb6dd is described below
commit d0eb6dd0cf8305c9b5c0bd53a31783337d9ed939
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Jan 6 16:12:57 2020 -0800
[BEAM-9059] Migrate PTransformTranslation to use string constants
---
.../core/construction/PTransformTranslation.java | 146 +++++++++++++++------
1 file changed, 105 insertions(+), 41 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index c21cd38..95f50ea 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.util.Collection;
@@ -60,68 +61,131 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
* buffers}.
*/
public class PTransformTranslation {
-
- public static final String PAR_DO_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.PAR_DO);
- public static final String FLATTEN_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.FLATTEN);
- public static final String GROUP_BY_KEY_TRANSFORM_URN =
- getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY);
- public static final String IMPULSE_TRANSFORM_URN = getUrn(StandardPTransforms.Primitives.IMPULSE);
- public static final String ASSIGN_WINDOWS_TRANSFORM_URN =
- getUrn(StandardPTransforms.Primitives.ASSIGN_WINDOWS);
- public static final String TEST_STREAM_TRANSFORM_URN =
- getUrn(StandardPTransforms.Primitives.TEST_STREAM);
- public static final String MAP_WINDOWS_TRANSFORM_URN =
- getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
-
+ // We specifically copy the values here so that they can be used in switch case statements
+ // and we validate that the value matches the actual URN in the static block below.
+
+ // Primitives
+ public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1";
+ public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1";
+ public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1";
+ public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1";
+ public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1";
+ public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1";
+ public static final String MAP_WINDOWS_TRANSFORM_URN = "beam:transform:map_windows:v1";
+
+ // DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
* + SplittableDoFns.
*/
- @Deprecated
- public static final String READ_TRANSFORM_URN =
- getUrn(StandardPTransforms.DeprecatedPrimitives.READ);
+ @Deprecated public static final String READ_TRANSFORM_URN = "beam:transform:read:v1";
+
/**
* @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
* part of the translation for a `ParDo` side input.
*/
@Deprecated
- public static final String CREATE_VIEW_TRANSFORM_URN =
- getUrn(StandardPTransforms.DeprecatedPrimitives.CREATE_VIEW);
+ public static final String CREATE_VIEW_TRANSFORM_URN = "beam:transform:create_view:v1";
- public static final String COMBINE_PER_KEY_TRANSFORM_URN =
- getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
- public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
- getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
- public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
- getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
+ // Composites
+ public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
+ public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
+ public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+ public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:v1";
+
+ // CombineComponents
public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
- getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
+ "beam:transform:combine_per_key_precombine:v1";
public static final String COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN =
- getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
+ "beam:transform:combine_per_key_merge_accumulators:v1";
public static final String COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN =
- getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
- public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE);
- public static final String WRITE_FILES_TRANSFORM_URN =
- getUrn(StandardPTransforms.Composites.WRITE_FILES);
+ "beam:transform:combine_per_key_extract_outputs:v1";
+ public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
+ "beam:transform:combine_grouped_values:v1";
// SplittableParDoComponents
public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
- getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION);
+ "beam:transform:sdf_pair_with_restriction:v1";
public static final String SPLITTABLE_SPLIT_RESTRICTION_URN =
- getUrn(SplittableParDoComponents.SPLIT_RESTRICTION);
+ "beam:transform:sdf_split_restriction:v1";
+ /**
+ * @deprecated runners should move away from using `SplittableProcessKeyedElements` and prefer to
+ * internalize any necessary SplittableDoFn expansion.
+ */
+ @Deprecated
public static final String SPLITTABLE_PROCESS_KEYED_URN =
- getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS);
+ "beam:transform:sdf_process_keyed_elements:v1";
+
public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
- getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
+ "beam:transform:sdf_process_elements:v1";
public static final String SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN =
- getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS);
+ "beam:transform:sdf_split_and_size_restrictions:v1";
public static final String SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN =
- getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS);
-
- public static final String ITERABLE_SIDE_INPUT =
- getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
- public static final String MULTIMAP_SIDE_INPUT =
- getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
+ "beam:transform:sdf_process_sized_element_and_restrictions:v1";
+
+ static {
+ checkState(PAR_DO_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.PAR_DO)));
+ checkState(FLATTEN_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.FLATTEN)));
+ checkState(
+ GROUP_BY_KEY_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY)));
+ checkState(IMPULSE_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.IMPULSE)));
+ checkState(
+ ASSIGN_WINDOWS_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.ASSIGN_WINDOWS)));
+ checkState(
+ TEST_STREAM_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.TEST_STREAM)));
+ checkState(
+ MAP_WINDOWS_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.MAP_WINDOWS)));
+
+ // DeprecatedPrimitives
+ checkState(READ_TRANSFORM_URN.equals(getUrn(StandardPTransforms.DeprecatedPrimitives.READ)));
+ checkState(
+ CREATE_VIEW_TRANSFORM_URN.equals(
+ getUrn(StandardPTransforms.DeprecatedPrimitives.CREATE_VIEW)));
+
+ // Composites
+ checkState(
+ COMBINE_PER_KEY_TRANSFORM_URN.equals(
+ getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY)));
+ checkState(
+ COMBINE_GLOBALLY_TRANSFORM_URN.equals(
+ getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY)));
+ checkState(RESHUFFLE_URN.equals(getUrn(StandardPTransforms.Composites.RESHUFFLE)));
+ checkState(
+ WRITE_FILES_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Composites.WRITE_FILES)));
+
+ // CombineComponents
+ checkState(
+ COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN.equals(
+ getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE)));
+ checkState(
+ COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN.equals(
+ getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS)));
+ checkState(
+ COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN.equals(
+ getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS)));
+ checkState(
+ COMBINE_GROUPED_VALUES_TRANSFORM_URN.equals(
+ getUrn(CombineComponents.COMBINE_GROUPED_VALUES)));
+
+ // SplittableParDoComponents
+ checkState(
+ SPLITTABLE_PAIR_WITH_RESTRICTION_URN.equals(
+ getUrn(SplittableParDoComponents.PAIR_WITH_RESTRICTION)));
+ checkState(
+ SPLITTABLE_SPLIT_RESTRICTION_URN.equals(
+ getUrn(SplittableParDoComponents.SPLIT_RESTRICTION)));
+ checkState(
+ SPLITTABLE_PROCESS_KEYED_URN.equals(
+ getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS)));
+ checkState(
+ SPLITTABLE_PROCESS_ELEMENTS_URN.equals(getUrn(SplittableParDoComponents.PROCESS_ELEMENTS)));
+ checkState(
+ SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN.equals(
+ getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS)));
+ checkState(
+ SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.equals(
+ getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS)));
+ }
private static final Collection<TransformTranslator<?>> KNOWN_TRANSLATORS =
loadKnownTranslators();