You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/19 16:21:19 UTC
[beam] branch master updated: [BEAM-9535] Remove unused
ParDoPayload.Parameters.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fa93450 [BEAM-9535] Remove unused ParDoPayload.Parameters.
new c16c7f9 Merge pull request #11150 from robertwb/no-parameter
fa93450 is described below
commit fa934508994bc77b364f53f0684b52f2d0dbf0f2
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Mar 17 17:46:34 2020 -0700
[BEAM-9535] Remove unused ParDoPayload.Parameters.
---
.../pipeline/src/main/proto/beam_runner_api.proto | 32 ------------
.../core/construction/ParDoTranslation.java | 57 ----------------------
.../runners/core/construction/SplittableParDo.java | 7 ---
.../dataflow/PrimitiveParDoSingleFactory.java | 7 ---
4 files changed, 103 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 7956730..6e84b23 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -433,11 +433,6 @@ message ParDoPayload {
// (Required) The FunctionSpec of the DoFn.
FunctionSpec do_fn = 1;
- // (Required) Additional pieces of context the DoFn may require that
- // are not otherwise represented in the payload.
- // (may force runners to execute the ParDo differently)
- repeated Parameter parameters = 2;
-
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 3;
@@ -479,33 +474,6 @@ message ParDoPayload {
bool requires_stable_input = 11;
}
-// Parameters that a UDF might require.
-//
-// The details of how a runner sends these parameters to the SDK harness
-// are the subject of the Fn API.
-//
-// The details of how an SDK harness delivers them to the UDF is entirely
-// up to the SDK. (for some SDKs there may be parameters that are not
-// represented here if the runner doesn't need to do anything)
-//
-// Here, the parameters are simply indicators to the runner that they
-// need to run the function a particular way.
-//
-// TODO: the evolution of the Fn API will influence what needs explicit
-// representation here
-message Parameter {
- Type.Enum type = 1;
-
- message Type {
- enum Enum {
- UNSPECIFIED = 0;
- WINDOW = 1;
- PIPELINE_OPTIONS = 2;
- RESTRICTION_TRACKER = 3;
- }
- }
-}
-
message StateSpec {
oneof spec {
ReadModifyWriteStateSpec read_modify_write_spec = 1;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index da8dae0..86c95fa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -36,12 +36,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Parameter.Type;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -65,9 +63,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -222,12 +217,6 @@ public class ParDoTranslation {
}
@Override
- public List<RunnerApi.Parameter> translateParameters() {
- return ParDoTranslation.translateParameters(
- signature.processElement().extraParameters());
- }
-
- @Override
public Map<String, SideInput> translateSideInputs(SdkComponents components) {
Map<String, SideInput> sideInputs = new HashMap<>();
for (PCollectionView<?> sideInput : parDo.getSideInputs().values()) {
@@ -315,17 +304,6 @@ public class ParDoTranslation {
components);
}
- public static List<RunnerApi.Parameter> translateParameters(List<Parameter> params) {
- List<RunnerApi.Parameter> parameters = new ArrayList<>();
- for (Parameter parameter : params) {
- RunnerApi.Parameter protoParameter = translateParameter(parameter);
- if (protoParameter != null) {
- parameters.add(protoParameter);
- }
- }
- return parameters;
- }
-
public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
return doFnWithExecutionInformationFromProto(payload.getDoFn()).getDoFn();
}
@@ -671,38 +649,6 @@ public class ParDoTranslation {
SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn With Execution Info");
}
- /**
- * Translates a Java DoFn parameter to a proto representation.
- *
- * <p>Returns {@code null} rather than crashing for parameters that are not yet supported, to
- * allow legacy Java-based runners to perform a proto round-trip and afterwards use {@link
- * DoFnSignatures} to analyze.
- *
- * <p>The proto definition for parameters is provisional and those parameters that are not needed
- * for portability will be removed from the enum.
- */
- // Using nullability instead of optional because of shading
- public static @Nullable RunnerApi.Parameter translateParameter(Parameter parameter) {
- return parameter.match(
- new Cases.WithDefault</* @Nullable in Java 8 */ RunnerApi.Parameter>() {
- @Override
- public RunnerApi.Parameter dispatch(WindowParameter p) {
- return RunnerApi.Parameter.newBuilder().setType(Type.Enum.WINDOW).build();
- }
-
- @Override
- public RunnerApi.Parameter dispatch(RestrictionTrackerParameter p) {
- return RunnerApi.Parameter.newBuilder().setType(Type.Enum.RESTRICTION_TRACKER).build();
- }
-
- @Override
- // Java 7 + findbugs limitation. The return type is nullable.
- protected @Nullable RunnerApi.Parameter dispatchDefault(Parameter p) {
- return null;
- }
- });
- }
-
public static Map<String, SideInput> translateSideInputs(
List<PCollectionView<?>> views, SdkComponents components) {
Map<String, SideInput> sideInputs = new HashMap<>();
@@ -767,8 +713,6 @@ public class ParDoTranslation {
public interface ParDoLike {
FunctionSpec translateDoFn(SdkComponents newComponents);
- List<RunnerApi.Parameter> translateParameters();
-
Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components);
Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
@@ -792,7 +736,6 @@ public class ParDoTranslation {
return ParDoPayload.newBuilder()
.setDoFn(parDo.translateDoFn(components))
- .addAllParameters(parDo.translateParameters())
.putAllStateSpecs(parDo.translateStateSpecs(components))
.putAllTimerSpecs(parDo.translateTimerSpecs(components))
.putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 84ed10a..1fb63ce 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Parameter;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
@@ -377,12 +376,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
- public List<Parameter> translateParameters() {
- return ParDoTranslation.translateParameters(
- signature.processElement().extraParameters());
- }
-
- @Override
public Map<String, SideInput> translateSideInputs(SdkComponents components) {
return ParDoTranslation.translateSideInputs(pke.getSideInputs(), components);
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 2f99372..aab4887 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -30,7 +30,6 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -206,12 +205,6 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
}
@Override
- public List<RunnerApi.Parameter> translateParameters() {
- return ParDoTranslation.translateParameters(
- signature.processElement().extraParameters());
- }
-
- @Override
public Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components) {
return ParDoTranslation.translateSideInputs(
parDo.getSideInputs().values().stream().collect(Collectors.toList()), components);