You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/19 16:21:19 UTC

[beam] branch master updated: [BEAM-9535] Remove unused ParDoPayload.Parameters.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fa93450  [BEAM-9535] Remove unused ParDoPayload.Parameters.
     new c16c7f9  Merge pull request #11150 from robertwb/no-parameter
fa93450 is described below

commit fa934508994bc77b364f53f0684b52f2d0dbf0f2
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Mar 17 17:46:34 2020 -0700

    [BEAM-9535] Remove unused ParDoPayload.Parameters.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  | 32 ------------
 .../core/construction/ParDoTranslation.java        | 57 ----------------------
 .../runners/core/construction/SplittableParDo.java |  7 ---
 .../dataflow/PrimitiveParDoSingleFactory.java      |  7 ---
 4 files changed, 103 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 7956730..6e84b23 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -433,11 +433,6 @@ message ParDoPayload {
   // (Required) The FunctionSpec of the DoFn.
   FunctionSpec do_fn = 1;
 
-  // (Required) Additional pieces of context the DoFn may require that
-  // are not otherwise represented in the payload.
-  // (may force runners to execute the ParDo differently)
-  repeated Parameter parameters = 2;
-
   // (Optional) A mapping of local input names to side inputs, describing
   // the expected access pattern.
   map<string, SideInput> side_inputs = 3;
@@ -479,33 +474,6 @@ message ParDoPayload {
   bool requires_stable_input = 11;
 }
 
-// Parameters that a UDF might require.
-//
-// The details of how a runner sends these parameters to the SDK harness
-// are the subject of the Fn API.
-//
-// The details of how an SDK harness delivers them to the UDF is entirely
-// up to the SDK. (for some SDKs there may be parameters that are not
-// represented here if the runner doesn't need to do anything)
-//
-// Here, the parameters are simply indicators to the runner that they
-// need to run the function a particular way.
-//
-// TODO: the evolution of the Fn API will influence what needs explicit
-// representation here
-message Parameter {
-  Type.Enum type = 1;
-
-  message Type {
-    enum Enum {
-      UNSPECIFIED = 0;
-      WINDOW = 1;
-      PIPELINE_OPTIONS = 2;
-      RESTRICTION_TRACKER = 3;
-    }
-  }
-}
-
 message StateSpec {
   oneof spec {
     ReadModifyWriteStateSpec read_modify_write_spec = 1;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index da8dae0..86c95fa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -36,12 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Parameter.Type;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -65,9 +63,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -222,12 +217,6 @@ public class ParDoTranslation {
           }
 
           @Override
-          public List<RunnerApi.Parameter> translateParameters() {
-            return ParDoTranslation.translateParameters(
-                signature.processElement().extraParameters());
-          }
-
-          @Override
           public Map<String, SideInput> translateSideInputs(SdkComponents components) {
             Map<String, SideInput> sideInputs = new HashMap<>();
             for (PCollectionView<?> sideInput : parDo.getSideInputs().values()) {
@@ -315,17 +304,6 @@ public class ParDoTranslation {
         components);
   }
 
-  public static List<RunnerApi.Parameter> translateParameters(List<Parameter> params) {
-    List<RunnerApi.Parameter> parameters = new ArrayList<>();
-    for (Parameter parameter : params) {
-      RunnerApi.Parameter protoParameter = translateParameter(parameter);
-      if (protoParameter != null) {
-        parameters.add(protoParameter);
-      }
-    }
-    return parameters;
-  }
-
   public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
     return doFnWithExecutionInformationFromProto(payload.getDoFn()).getDoFn();
   }
@@ -671,38 +649,6 @@ public class ParDoTranslation {
         SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn With Execution Info");
   }
 
-  /**
-   * Translates a Java DoFn parameter to a proto representation.
-   *
-   * <p>Returns {@code null} rather than crashing for parameters that are not yet supported, to
-   * allow legacy Java-based runners to perform a proto round-trip and afterwards use {@link
-   * DoFnSignatures} to analyze.
-   *
-   * <p>The proto definition for parameters is provisional and those parameters that are not needed
-   * for portability will be removed from the enum.
-   */
-  // Using nullability instead of optional because of shading
-  public static @Nullable RunnerApi.Parameter translateParameter(Parameter parameter) {
-    return parameter.match(
-        new Cases.WithDefault</* @Nullable in Java 8 */ RunnerApi.Parameter>() {
-          @Override
-          public RunnerApi.Parameter dispatch(WindowParameter p) {
-            return RunnerApi.Parameter.newBuilder().setType(Type.Enum.WINDOW).build();
-          }
-
-          @Override
-          public RunnerApi.Parameter dispatch(RestrictionTrackerParameter p) {
-            return RunnerApi.Parameter.newBuilder().setType(Type.Enum.RESTRICTION_TRACKER).build();
-          }
-
-          @Override
-          // Java 7 + findbugs limitation. The return type is nullable.
-          protected @Nullable RunnerApi.Parameter dispatchDefault(Parameter p) {
-            return null;
-          }
-        });
-  }
-
   public static Map<String, SideInput> translateSideInputs(
       List<PCollectionView<?>> views, SdkComponents components) {
     Map<String, SideInput> sideInputs = new HashMap<>();
@@ -767,8 +713,6 @@ public class ParDoTranslation {
   public interface ParDoLike {
     FunctionSpec translateDoFn(SdkComponents newComponents);
 
-    List<RunnerApi.Parameter> translateParameters();
-
     Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components);
 
     Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
@@ -792,7 +736,6 @@ public class ParDoTranslation {
 
     return ParDoPayload.newBuilder()
         .setDoFn(parDo.translateDoFn(components))
-        .addAllParameters(parDo.translateParameters())
         .putAllStateSpecs(parDo.translateStateSpecs(components))
         .putAllTimerSpecs(parDo.translateTimerSpecs(components))
         .putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 84ed10a..1fb63ce 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Parameter;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
@@ -377,12 +376,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
                 }
 
                 @Override
-                public List<Parameter> translateParameters() {
-                  return ParDoTranslation.translateParameters(
-                      signature.processElement().extraParameters());
-                }
-
-                @Override
                 public Map<String, SideInput> translateSideInputs(SdkComponents components) {
                   return ParDoTranslation.translateSideInputs(pke.getSideInputs(), components);
                 }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 2f99372..aab4887 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -30,7 +30,6 @@ import com.google.auto.service.AutoService;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -206,12 +205,6 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
             }
 
             @Override
-            public List<RunnerApi.Parameter> translateParameters() {
-              return ParDoTranslation.translateParameters(
-                  signature.processElement().extraParameters());
-            }
-
-            @Override
             public Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components) {
               return ParDoTranslation.translateSideInputs(
                   parDo.getSideInputs().values().stream().collect(Collectors.toList()), components);