You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/18 08:04:21 UTC
[beam] branch master updated: Remove unused Flink assignWindows
translator and workarounds.
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c27372 Remove unused Flink assignWindows translator and workarounds.
new 7e186f0 Merge pull request #8058 Remove unused Flink assignWindows translator and workarounds.
3c27372 is described below
commit 3c273724e432bfc4e55326cec61e2939fa4c16b5
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Mar 14 14:40:26 2019 +0100
Remove unused Flink assignWindows translator and workarounds.
---
.../core/construction/graph/PipelineTrimmer.java | 11 +----
.../FlinkBatchPortablePipelineTranslator.java | 51 ----------------------
.../FlinkStreamingPortablePipelineTranslator.java | 40 -----------------
3 files changed, 1 insertion(+), 101 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
index f3edaad..894a5d1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
@@ -20,9 +20,6 @@ package org.apache.beam.runners.core.construction.graph;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,13 +37,7 @@ public class PipelineTrimmer {
* @return the trimmed pipeline
*/
public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
- RunnerApi.Pipeline trimmedPipeline =
- makeKnownUrnsPrimitives(
- pipeline,
- // The fuser should fuse AssignWindows into the graph, so we don't handle it here.
- Sets.difference(
- knownUrns, ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
- return trimmedPipeline;
+ return makeKnownUrnsPrimitives(pipeline, knownUrns);
}
private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 496d737..745a784 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -48,7 +48,6 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
@@ -71,7 +70,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
@@ -147,9 +145,6 @@ public class FlinkBatchPortablePipelineTranslator
PTransformTranslation.IMPULSE_TRANSFORM_URN,
FlinkBatchPortablePipelineTranslator::translateImpulse);
translatorMap.put(
- PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
- FlinkBatchPortablePipelineTranslator::translateAssignWindows);
- translatorMap.put(
ExecutableStage.URN, FlinkBatchPortablePipelineTranslator::translateExecutableStage);
translatorMap.put(
PTransformTranslation.RESHUFFLE_URN,
@@ -286,52 +281,6 @@ public class FlinkBatchPortablePipelineTranslator
inputDataSet.rebalance());
}
- private static <T> void translateAssignWindows(
- PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
- RunnerApi.Components components = pipeline.getComponents();
- RunnerApi.WindowIntoPayload payload;
- try {
- payload =
- RunnerApi.WindowIntoPayload.parseFrom(transform.getTransform().getSpec().getPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalArgumentException(e);
- }
- // TODO: https://issues.apache.org/jira/browse/BEAM-4296
- // This only works for well known window fns, we should defer this execution to the SDK
- // if the WindowFn can't be parsed or just defer it all the time.
- WindowFn<T, ? extends BoundedWindow> windowFn =
- (WindowFn<T, ? extends BoundedWindow>)
- WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
-
- String inputCollectionId =
- Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
- String outputCollectionId =
- Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
- PCollectionNode collectionNode =
- PipelineNode.pCollection(
- outputCollectionId, components.getPcollectionsOrThrow(outputCollectionId));
- Coder<WindowedValue<T>> outputCoder;
- try {
- outputCoder = WireCoders.instantiateRunnerWireCoder(collectionNode, components);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- TypeInformation<WindowedValue<T>> resultTypeInfo = new CoderTypeInformation<>(outputCoder);
-
- DataSet<WindowedValue<T>> inputDataSet = context.getDataSetOrThrow(inputCollectionId);
-
- FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
- new FlinkAssignWindows<>(windowFn);
-
- DataSet<WindowedValue<T>> resultDataSet =
- inputDataSet
- .flatMap(assignWindowsFunction)
- .name(transform.getTransform().getUniqueName())
- .returns(resultTypeInfo);
-
- context.addDataSet(outputCollectionId, resultDataSet);
- }
-
private static <InputT> void translateExecutableStage(
PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
// TODO: Fail on splittable DoFns.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index a2c4f92..e6018d5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -45,7 +45,6 @@ import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -69,7 +68,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
@@ -193,8 +191,6 @@ public class FlinkStreamingPortablePipelineTranslator
translatorMap.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey);
translatorMap.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse);
translatorMap.put(STREAMING_IMPULSE_TRANSFORM_URN, this::translateStreamingImpulse);
- translatorMap.put(
- PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, this::translateAssignWindows);
translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
translatorMap.put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle);
@@ -475,42 +471,6 @@ public class FlinkStreamingPortablePipelineTranslator
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source);
}
- private <T> void translateAssignWindows(
- String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
- RunnerApi.Components components = pipeline.getComponents();
- RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
- RunnerApi.WindowIntoPayload payload;
- try {
- payload = RunnerApi.WindowIntoPayload.parseFrom(transform.getSpec().getPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalArgumentException(e);
- }
- // TODO: https://issues.apache.org/jira/browse/BEAM-4296
- // This only works for well known window fns, we should defer this execution to the SDK
- // if the WindowFn can't be parsed or just defer it all the time.
- WindowFn<T, ? extends BoundedWindow> windowFn =
- (WindowFn<T, ? extends BoundedWindow>)
- WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
-
- String inputCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
- String outputCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
- Coder<WindowedValue<T>> outputCoder = instantiateCoder(outputCollectionId, components);
- TypeInformation<WindowedValue<T>> resultTypeInfo = new CoderTypeInformation<>(outputCoder);
-
- DataStream<WindowedValue<T>> inputDataStream = context.getDataStreamOrThrow(inputCollectionId);
-
- FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
- new FlinkAssignWindows<>(windowFn);
-
- DataStream<WindowedValue<T>> resultDataStream =
- inputDataStream
- .flatMap(assignWindowsFunction)
- .name(transform.getUniqueName())
- .returns(resultTypeInfo);
-
- context.addDataStream(outputCollectionId, resultDataStream);
- }
-
private <InputT, OutputT> void translateExecutableStage(
String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
// TODO: Fail on splittable DoFns.