You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:12 UTC
[38/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
deleted file mode 100644
index 681459a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-
-/**
- * AutoService registrar - will register FlinkRunner and FlinkOptions
- * as possible pipeline runner services.
- *
- * <p>It ends up in META-INF/services and gets picked up by Beam.
- *
- */
-public class FlinkRunnerRegistrar {
- private FlinkRunnerRegistrar() { }
-
- /**
- * Pipeline runner registrar.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- FlinkRunner.class,
- TestFlinkRunner.class);
- }
- }
-
- /**
- * Pipeline options registrar.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
deleted file mode 100644
index 0682b56..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.joda.time.Duration;
-
-/**
- * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
- * has methods to query to job runtime and the final values of
- * {@link org.apache.beam.sdk.transforms.Aggregator}s.
- */
-public class FlinkRunnerResult implements PipelineResult {
-
- private final Map<String, Object> aggregators;
-
- private final long runtime;
-
- FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
- this.aggregators = (aggregators == null || aggregators.isEmpty())
- ? Collections.<String, Object>emptyMap()
- : Collections.unmodifiableMap(aggregators);
- this.runtime = runtime;
- }
-
- @Override
- public State getState() {
- return State.DONE;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- // TODO provide a list of all accumulator step values
- Object value = aggregators.get(aggregator.getName());
- if (value != null) {
- return new AggregatorValues<T>() {
- @Override
- public Map<String, T> getValuesAtSteps() {
- return (Map<String, T>) aggregators;
- }
- };
- } else {
- throw new AggregatorRetrievalException("Accumulator results not found.",
- new RuntimeException("Accumulator does not exist."));
- }
- }
-
- @Override
- public String toString() {
- return "FlinkRunnerResult{"
- + "aggregators=" + aggregators
- + ", runtime=" + runtime
- + '}';
- }
-
- @Override
- public State cancel() throws IOException {
- throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
- }
-
- @Override
- public State waitUntilFinish() {
- return State.DONE;
- }
-
- @Override
- public State waitUntilFinish(Duration duration) {
- return State.DONE;
- }
-
- @Override
- public MetricResults metrics() {
- throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
deleted file mode 100644
index 0459ef7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
- * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
- * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- */
-class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
-
- /** The necessary context in the case of a straming job. */
- private final FlinkStreamingTranslationContext streamingContext;
-
- private int depth = 0;
-
- private FlinkRunner flinkRunner;
-
- public FlinkStreamingPipelineTranslator(
- FlinkRunner flinkRunner,
- StreamExecutionEnvironment env,
- PipelineOptions options) {
- this.streamingContext = new FlinkStreamingTranslationContext(env, options);
- this.flinkRunner = flinkRunner;
- }
-
- @Override
- public void translate(Pipeline pipeline) {
- List<PTransformOverride> transformOverrides =
- ImmutableList.<PTransformOverride>builder()
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(),
- new SplittableParDoOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
- // this has to be last since the ViewAsSingleton override
- // can expand to a Combine.GloballyAsSingletonView
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new ReflectiveOneToOneOverrideFactory(
- FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
- flinkRunner)))
- .build();
-
- pipeline.replaceAll(transformOverrides);
- super.translate(pipeline);
- }
-
- // --------------------------------------------------------------------------------------------
- // Pipeline Visitor Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
- this.depth++;
-
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null) {
- StreamTransformTranslator<?> translator =
- FlinkStreamingTransformTranslators.getTranslator(transform);
-
- if (translator != null && applyCanTranslate(transform, node, translator)) {
- applyStreamingTransform(transform, node, translator);
- LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
- }
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- this.depth--;
- LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
- // get the transformation corresponding to hte node we are
- // currently visiting and translate it into its Flink alternative.
-
- PTransform<?, ?> transform = node.getTransform();
- StreamTransformTranslator<?> translator =
- FlinkStreamingTransformTranslators.getTranslator(transform);
-
- if (translator == null || !applyCanTranslate(transform, node, translator)) {
- LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException(
- "The transform " + transform + " is currently not supported.");
- }
- applyStreamingTransform(transform, node, translator);
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- // do nothing here
- }
-
- private <T extends PTransform<?, ?>> void applyStreamingTransform(
- PTransform<?, ?> transform,
- TransformHierarchy.Node node,
- StreamTransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
- // create the applied PTransform on the streamingContext
- streamingContext.setCurrentTransform(node.toAppliedPTransform());
- typedTranslator.translateNode(typedTransform, streamingContext);
- }
-
- private <T extends PTransform<?, ?>> boolean applyCanTranslate(
- PTransform<?, ?> transform,
- TransformHierarchy.Node node,
- StreamTransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
- streamingContext.setCurrentTransform(node.toAppliedPTransform());
-
- return typedTranslator.canTranslate(typedTransform, streamingContext);
- }
-
- /**
- * The interface that every Flink translator of a Beam operator should implement.
- * This interface is for <b>streaming</b> jobs. For examples of such translators see
- * {@link FlinkStreamingTransformTranslators}.
- */
- abstract static class StreamTransformTranslator<T extends PTransform> {
-
- /**
- * Translate the given transform.
- */
- abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
-
- /**
- * Returns true iff this translator can translate the given transform.
- */
- boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
- return true;
- }
- }
-
- private static class ReflectiveOneToOneOverrideFactory<
- InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
- extends SingleInputOutputOverrideFactory<
- PCollection<InputT>, PCollection<OutputT>, TransformT> {
- private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
- private final FlinkRunner runner;
-
- private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
- FlinkRunner runner) {
- this.replacement = replacement;
- this.runner = runner;
- }
-
- @Override
- public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- InstanceBuilder.ofType(replacement)
- .withArg(FlinkRunner.class, runner)
- .withArg(
- (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
- transform.getTransform().getClass(),
- transform.getTransform())
- .build());
- }
- }
-
- /**
- * A {@link PTransformOverrideFactory} that overrides a <a
- * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
- */
- static class SplittableParDoOverrideFactory<InputT, OutputT>
- implements PTransformOverrideFactory<
- PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
- @Override
- public PTransformReplacement<PCollection<InputT>, PCollectionTuple>
- getReplacementTransform(
- AppliedPTransform<
- PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
- transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- new SplittableParDo<>(transform.getTransform()));
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
- return ReplacementOutputs.tagged(outputs, newOutput);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
deleted file mode 100644
index 123d5e7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.beam.runners.core.ElementAndRestriction;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.join.UnionCoder;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class contains all the mappings between Beam and Flink
- * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator}
- * traverses the Beam job and comes here to translate the encountered Beam transformations
- * into Flink one, based on the mapping available in this class.
- */
-class FlinkStreamingTransformTranslators {
-
- // --------------------------------------------------------------------------------------------
- // Transform Translator Registry
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("rawtypes")
- private static final Map<
- Class<? extends PTransform>,
- FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
-
- // here you can find all the available translators.
- static {
- TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
- TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
- TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
- TRANSLATORS.put(
- SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator());
- TRANSLATORS.put(
- SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
-
-
- TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
- TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
- TRANSLATORS.put(
- FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
- new CreateViewStreamingTranslator());
-
- TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
- TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- }
-
- public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
- PTransform<?, ?> transform) {
- return TRANSLATORS.get(transform.getClass());
- }
-
- // --------------------------------------------------------------------------------------------
- // Transformation Implementations
- // --------------------------------------------------------------------------------------------
-
- private static class TextIOWriteBoundStreamingTranslator
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
- @Override
- public void translateNode(
- TextIO.Write.Bound transform,
- FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
- DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- boolean needsValidation = transform.needsValidation();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn(
- "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
- needsValidation);
- LOG.warn(
- "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
- filenameSuffix);
- LOG.warn(
- "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
- shardNameTemplate);
-
- DataStream<String> dataSink = inputDataStream
- .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
- @Override
- public void flatMap(
- WindowedValue<String> value,
- Collector<String> out)
- throws Exception {
- out.collect(value.getValue());
- }
- });
- DataStreamSink<String> output =
- dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
- if (numShards > 0) {
- output.setParallelism(numShards);
- }
- }
- }
-
- private static class UnboundedReadSourceTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
-
- @Override
- public void translateNode(
- Read.Unbounded<T> transform,
- FlinkStreamingTranslationContext context) {
- PCollection<T> output = context.getOutput(transform);
-
- TypeInformation<WindowedValue<T>> outputTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- DataStream<WindowedValue<T>> source;
- try {
- UnboundedSourceWrapper<T, ?> sourceWrapper =
- new UnboundedSourceWrapper<>(
- context.getPipelineOptions(),
- transform.getSource(),
- context.getExecutionEnvironment().getParallelism());
- source = context
- .getExecutionEnvironment()
- .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
- } catch (Exception e) {
- throw new RuntimeException(
- "Error while translating UnboundedSource: " + transform.getSource(), e);
- }
-
- context.setOutputDataStream(output, source);
- }
- }
-
- private static class BoundedReadSourceTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
-
- @Override
- public void translateNode(
- Read.Bounded<T> transform,
- FlinkStreamingTranslationContext context) {
- PCollection<T> output = context.getOutput(transform);
-
- TypeInformation<WindowedValue<T>> outputTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
-
- DataStream<WindowedValue<T>> source;
- try {
- BoundedSourceWrapper<T> sourceWrapper =
- new BoundedSourceWrapper<>(
- context.getPipelineOptions(),
- transform.getSource(),
- context.getExecutionEnvironment().getParallelism());
- source = context
- .getExecutionEnvironment()
- .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
- } catch (Exception e) {
- throw new RuntimeException(
- "Error while translating BoundedSource: " + transform.getSource(), e);
- }
-
- context.setOutputDataStream(output, source);
- }
- }
-
- /**
- * Wraps each element in a {@link RawUnionValue} with the given tag id.
- */
- private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
- private final int intTag;
-
- public ToRawUnion(int intTag) {
- this.intTag = intTag;
- }
-
- @Override
- public RawUnionValue map(T o) throws Exception {
- return new RawUnionValue(intTag, o);
- }
- }
-
- private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
- transformSideInputs(
- Collection<PCollectionView<?>> sideInputs,
- FlinkStreamingTranslationContext context) {
-
- // collect all side inputs
- Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
- Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
- int count = 0;
- for (PCollectionView<?> sideInput: sideInputs) {
- TupleTag<?> tag = sideInput.getTagInternal();
- intToViewMapping.put(count, sideInput);
- tagToIntMapping.put(tag, count);
- count++;
- Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
- }
-
-
- List<Coder<?>> inputCoders = new ArrayList<>();
- for (PCollectionView<?> sideInput: sideInputs) {
- DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
- TypeInformation<Object> tpe = sideInputStream.getType();
- if (!(tpe instanceof CoderTypeInformation)) {
- throw new IllegalStateException(
- "Input Stream TypeInformation is no CoderTypeInformation.");
- }
-
- Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
- inputCoders.add(coder);
- }
-
- UnionCoder unionCoder = UnionCoder.of(inputCoders);
-
- CoderTypeInformation<RawUnionValue> unionTypeInformation =
- new CoderTypeInformation<>(unionCoder);
-
- // transform each side input to RawUnionValue and union them
- DataStream<RawUnionValue> sideInputUnion = null;
-
- for (PCollectionView<?> sideInput: sideInputs) {
- TupleTag<?> tag = sideInput.getTagInternal();
- final int intTag = tagToIntMapping.get(tag);
- DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
- DataStream<RawUnionValue> unionValueStream =
- sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
-
- if (sideInputUnion == null) {
- sideInputUnion = unionValueStream;
- } else {
- sideInputUnion = sideInputUnion.union(unionValueStream);
- }
- }
-
- if (sideInputUnion == null) {
- throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
- }
-
- return new Tuple2<>(intToViewMapping, sideInputUnion);
- }
-
- /**
- * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}.
- */
- static class ParDoTranslationHelper {
-
- interface DoFnOperatorFactory<InputT, OutputT> {
- DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
- DoFn<InputT, OutputT> doFn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- FlinkStreamingTranslationContext context,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
- Coder<WindowedValue<InputT>> inputCoder,
- Coder keyCoder,
- Map<Integer, PCollectionView<?>> transformedSideInputs);
- }
-
- static <InputT, OutputT> void translateParDo(
- String transformName,
- DoFn<InputT, OutputT> doFn,
- PCollection<InputT> input,
- List<PCollectionView<?>> sideInputs,
- Map<TupleTag<?>, PValue> outputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- FlinkStreamingTranslationContext context,
- DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
-
- // we assume that the transformation does not change the windowing strategy.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- Map<TupleTag<?>, Integer> tagsToLabels =
- transformTupleTagsToLabels(mainOutputTag, outputs);
-
- SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
-
- Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
-
- DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
-
- Coder keyCoder = null;
- boolean stateful = false;
- DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- // Based on the fact that the signature is stateful, DoFnSignatures ensures
- // that it is also keyed
- keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
- inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
- stateful = true;
- } else if (doFn instanceof SplittableParDo.ProcessFn) {
- // we know that it is keyed on String
- keyCoder = StringUtf8Coder.of();
- stateful = true;
- }
-
- if (sideInputs.isEmpty()) {
- DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
- doFnOperatorFactory.createDoFnOperator(
- doFn,
- sideInputs,
- mainOutputTag,
- additionalOutputTags,
- context,
- windowingStrategy,
- tagsToLabels,
- inputCoder,
- keyCoder,
- new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
-
- UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
- CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
- new CoderTypeInformation<>(outputUnionCoder);
-
- unionOutputStream = inputDataStream
- .transform(transformName, outputUnionTypeInformation, doFnOperator);
-
- } else {
- Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
- transformSideInputs(sideInputs, context);
-
- DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
- doFnOperatorFactory.createDoFnOperator(
- doFn,
- sideInputs,
- mainOutputTag,
- additionalOutputTags,
- context,
- windowingStrategy,
- tagsToLabels,
- inputCoder,
- keyCoder,
- transformedSideInputs.f0);
-
- UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
- CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
- new CoderTypeInformation<>(outputUnionCoder);
-
- if (stateful) {
- // we have to manually contruct the two-input transform because we're not
- // allowed to have only one input keyed, normally.
- KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
- TwoInputTransformation<
- WindowedValue<KV<?, InputT>>,
- RawUnionValue,
- WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
- keyedStream.getTransformation(),
- transformedSideInputs.f1.broadcast().getTransformation(),
- transformName,
- (TwoInputStreamOperator) doFnOperator,
- outputUnionTypeInformation,
- keyedStream.getParallelism());
-
- rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
- rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
-
- unionOutputStream = new SingleOutputStreamOperator(
- keyedStream.getExecutionEnvironment(),
- rawFlinkTransform) {}; // we have to cheat around the ctor being protected
-
- keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
-
- } else {
- unionOutputStream = inputDataStream
- .connect(transformedSideInputs.f1.broadcast())
- .transform(transformName, outputUnionTypeInformation, doFnOperator);
- }
- }
-
- SplitStream<RawUnionValue> splitStream = unionOutputStream
- .split(new OutputSelector<RawUnionValue>() {
- @Override
- public Iterable<String> select(RawUnionValue value) {
- return Collections.singletonList(Integer.toString(value.getUnionTag()));
- }
- });
-
- for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
- final int outputTag = tagsToLabels.get(output.getKey());
-
- TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
-
- @SuppressWarnings("unchecked")
- DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
- .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
- @Override
- public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
- out.collect(value.getValue());
- }
- }).returns(outputTypeInfo);
-
- context.setOutputDataStream(output.getValue(), unwrapped);
- }
- }
-
- private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
- TupleTag<?> mainTag,
- Map<TupleTag<?>, PValue> allTaggedValues) {
-
- Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
- int count = 0;
- tagToLabelMap.put(mainTag, count++);
- for (TupleTag<?> key : allTaggedValues.keySet()) {
- if (!tagToLabelMap.containsKey(key)) {
- tagToLabelMap.put(key, count++);
- }
- }
- return tagToLabelMap;
- }
-
- private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PValue taggedColl : taggedCollections.values()) {
- checkArgument(
- taggedColl instanceof PCollection,
- "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
- PCollection.class.getSimpleName(),
- taggedColl.getClass().getSimpleName());
- PCollection<?> coll = (PCollection<?>) taggedColl;
- WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
- WindowedValue.getFullCoder(
- coll.getCoder(),
- coll.getWindowingStrategy().getWindowFn().windowCoder());
- outputCoders.add(windowedValueCoder);
- }
- return UnionCoder.of(outputCoders);
- }
- }
-
- private static class ParDoStreamingTranslator<InputT, OutputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- public void translateNode(
- ParDo.MultiOutput<InputT, OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- ParDoTranslationHelper.translateParDo(
- transform.getName(),
- transform.getFn(),
- (PCollection<InputT>) context.getInput(transform),
- transform.getSideInputs(),
- context.getOutputs(transform),
- transform.getMainOutputTag(),
- transform.getAdditionalOutputTags().getAll(),
- context,
- new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
- @Override
- public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
- DoFn<InputT, OutputT> doFn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- FlinkStreamingTranslationContext context,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
- Coder<WindowedValue<InputT>> inputCoder,
- Coder keyCoder,
- Map<Integer, PCollectionView<?>> transformedSideInputs) {
- return new DoFnOperator<>(
- doFn,
- inputCoder,
- mainOutputTag,
- additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
- windowingStrategy,
- transformedSideInputs,
- sideInputs,
- context.getPipelineOptions(),
- keyCoder);
- }
- });
- }
- }
-
- private static class SplittableProcessElementsStreamingTranslator<
- InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
-
- @Override
- public void translateNode(
- SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
- FlinkStreamingTranslationContext context) {
-
- ParDoTranslationHelper.translateParDo(
- transform.getName(),
- transform.newProcessFn(transform.getFn()),
- (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
- context.getInput(transform),
- transform.getSideInputs(),
- context.getOutputs(transform),
- transform.getMainOutputTag(),
- transform.getAdditionalOutputTags().getAll(),
- context,
- new ParDoTranslationHelper.DoFnOperatorFactory<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
- @Override
- public DoFnOperator<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
- OutputT,
- RawUnionValue> createDoFnOperator(
- DoFn<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
- OutputT> doFn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- FlinkStreamingTranslationContext context,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
- Coder<
- WindowedValue<
- KeyedWorkItem<
- String,
- ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
- Coder keyCoder,
- Map<Integer, PCollectionView<?>> transformedSideInputs) {
- return new SplittableDoFnOperator<>(
- doFn,
- inputCoder,
- mainOutputTag,
- additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
- windowingStrategy,
- transformedSideInputs,
- sideInputs,
- context.getPipelineOptions(),
- keyCoder);
- }
- });
- }
- }
-
- private static class CreateViewStreamingTranslator<ElemT, ViewT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
-
- @Override
- public void translateNode(
- FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
- FlinkStreamingTranslationContext context) {
- // just forward
- DataStream<WindowedValue<List<ElemT>>> inputDataSet =
- context.getInputDataStream(context.getInput(transform));
-
- PCollectionView<ViewT> view = context.getOutput(transform);
-
- context.setOutputDataStream(view, inputDataSet);
- }
- }
-
- private static class WindowAssignTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
-
- @Override
- public void translateNode(
- Window.Assign<T> transform,
- FlinkStreamingTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<T, BoundedWindow> windowingStrategy =
- (WindowingStrategy<T, BoundedWindow>)
- context.getOutput(transform).getWindowingStrategy();
-
- TypeInformation<WindowedValue<T>> typeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- DataStream<WindowedValue<T>> inputDataStream =
- context.getInputDataStream(context.getInput(transform));
-
- WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
- FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
- new FlinkAssignWindows<>(windowFn);
-
- SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
- .flatMap(assignWindowsFunction)
- .name(context.getOutput(transform).getName())
- .returns(typeInfo);
-
- context.setOutputDataStream(context.getOutput(transform), outputDataStream);
- }
- }
-
- private static class ReshuffleTranslatorStreaming<K, InputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
-
- @Override
- public void translateNode(
- Reshuffle<K, InputT> transform,
- FlinkStreamingTranslationContext context) {
-
- DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
- context.getInputDataStream(context.getInput(transform));
-
- context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
-
- }
- }
-
-
- private static class GroupByKeyTranslator<K, InputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
-
- @Override
- public void translateNode(
- GroupByKey<K, InputT> transform,
- FlinkStreamingTranslationContext context) {
-
- PCollection<KV<K, InputT>> input = context.getInput(transform);
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, BoundedWindow> windowingStrategy =
- (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
- KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
- SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
- inputKvCoder.getKeyCoder(),
- inputKvCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
- WindowedValue.
- FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
- WindowedValue.getFullCoder(
- workItemCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
-
- DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
- inputDataStream
- .flatMap(new ToKeyedWorkItem<K, InputT>())
- .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
- KeyedStream<
- WindowedValue<
- SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
- .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
- SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
- SystemReduceFn.buffering(inputKvCoder.getValueCoder());
-
- TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- DoFnOperator.DefaultOutputManagerFactory<
- WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
- new DoFnOperator.DefaultOutputManagerFactory<>();
-
- WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
- new WindowDoFnOperator<>(
- reduceFn,
- (Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, Iterable<InputT>>>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- outputManagerFactory,
- windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder());
-
- // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
- // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
- @SuppressWarnings("unchecked")
- SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
- keyedWorkItemStream
- .transform(
- transform.getName(),
- outputTypeInfo,
- (OneInputStreamOperator) doFnOperator);
-
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
-
- }
- }
-
- private static class CombinePerKeyTranslator<K, InputT, OutputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- Combine.PerKey<K, InputT, OutputT>> {
-
- @Override
- boolean canTranslate(
- Combine.PerKey<K, InputT, OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- // if we have a merging window strategy and side inputs we cannot
- // translate as a proper combine. We have to group and then run the combine
- // over the final grouped values.
- PCollection<KV<K, InputT>> input = context.getInput(transform);
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, BoundedWindow> windowingStrategy =
- (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
- return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
- }
-
- @Override
- public void translateNode(
- Combine.PerKey<K, InputT, OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- PCollection<KV<K, InputT>> input = context.getInput(transform);
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, BoundedWindow> windowingStrategy =
- (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
- KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
- SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
- inputKvCoder.getKeyCoder(),
- inputKvCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
- WindowedValue.
- FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
- WindowedValue.getFullCoder(
- workItemCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
-
- DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
- inputDataStream
- .flatMap(new ToKeyedWorkItem<K, InputT>())
- .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
- KeyedStream<
- WindowedValue<
- SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
- .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
- SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining(
- inputKvCoder.getKeyCoder(),
- AppliedCombineFn.withInputCoder(
- transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
-
- TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
- if (sideInputs.isEmpty()) {
-
- WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
- new WindowDoFnOperator<>(
- reduceFn,
- (Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, OutputT>>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
- windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder());
-
- // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
- // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
- @SuppressWarnings("unchecked")
- SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
- keyedWorkItemStream.transform(
- transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
-
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
- } else {
- Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
- transformSideInputs(sideInputs, context);
-
- WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
- new WindowDoFnOperator<>(
- reduceFn,
- (Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, OutputT>>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
- windowingStrategy,
- transformSideInputs.f0,
- sideInputs,
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder());
-
- // we have to manually contruct the two-input transform because we're not
- // allowed to have only one input keyed, normally.
-
- TwoInputTransformation<
- WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
- RawUnionValue,
- WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
- keyedWorkItemStream.getTransformation(),
- transformSideInputs.f1.broadcast().getTransformation(),
- transform.getName(),
- (TwoInputStreamOperator) doFnOperator,
- outputTypeInfo,
- keyedWorkItemStream.getParallelism());
-
- rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
- rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
- new SingleOutputStreamOperator(
- keyedWorkItemStream.getExecutionEnvironment(),
- rawFlinkTransform) {}; // we have to cheat around the ctor being protected
-
- keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
-
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
- }
- }
- }
-
- private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
-
- @Override
- boolean canTranslate(
- SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
- FlinkStreamingTranslationContext context) {
- return true;
- }
-
- @Override
- public void translateNode(
- SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
- FlinkStreamingTranslationContext context) {
-
- PCollection<KV<K, InputT>> input = context.getInput(transform);
-
- KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
- SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
- inputKvCoder.getKeyCoder(),
- inputKvCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
-
- WindowedValue.
- FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
- WindowedValue.getFullCoder(
- workItemCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
- new CoderTypeInformation<>(windowedWorkItemCoder);
-
- DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
- DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
- inputDataStream
- .flatMap(new ToKeyedWorkItem<K, InputT>())
- .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
- KeyedStream<
- WindowedValue<
- SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
- .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
- context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
- }
- }
-
- private static class FlattenPCollectionTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- Flatten.PCollections<T>> {
-
- @Override
- public void translateNode(
- Flatten.PCollections<T> transform,
- FlinkStreamingTranslationContext context) {
- Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
-
- if (allInputs.isEmpty()) {
-
- // create an empty dummy source to satisfy downstream operations
- // we cannot create an empty source in Flink, therefore we have to
- // add the flatMap that simply never forwards the single element
- DataStreamSource<String> dummySource =
- context.getExecutionEnvironment().fromElements("dummy");
-
- DataStream<WindowedValue<T>> result = dummySource.flatMap(
- new FlatMapFunction<String, WindowedValue<T>>() {
- @Override
- public void flatMap(
- String s,
- Collector<WindowedValue<T>> collector) throws Exception {
- // never return anything
- }
- }).returns(
- new CoderTypeInformation<>(
- WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
- GlobalWindow.Coder.INSTANCE)));
- context.setOutputDataStream(context.getOutput(transform), result);
-
- } else {
- DataStream<T> result = null;
- for (PValue input : allInputs.values()) {
- DataStream<T> current = context.getInputDataStream(input);
- result = (result == null) ? current : result.union(current);
- }
- context.setOutputDataStream(context.getOutput(transform), result);
- }
- }
- }
-
- private static class ToKeyedWorkItem<K, InputT>
- extends RichFlatMapFunction<
- WindowedValue<KV<K, InputT>>,
- WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
-
- @Override
- public void flatMap(
- WindowedValue<KV<K, InputT>> inWithMultipleWindows,
- Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
-
- // we need to wrap each one work item per window for now
- // since otherwise the PushbackSideInputRunner will not correctly
- // determine whether side inputs are ready
- //
- // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
- for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
- SingletonKeyedWorkItem<K, InputT> workItem =
- new SingletonKeyedWorkItem<>(
- in.getValue().getKey(),
- in.withValue(in.getValue().getValue()));
-
- out.collect(in.withValue(workItem));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
deleted file mode 100644
index 1a943a3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Helper for keeping track of which {@link DataStream DataStreams} map
- * to which {@link PTransform PTransforms}.
- */
-class FlinkStreamingTranslationContext {
-
- private final StreamExecutionEnvironment env;
- private final PipelineOptions options;
-
- /**
- * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
- * Flink Operator that produced it, after the translation of the correspondinf PTransform
- * to its Flink equivalent.
- * */
- private final Map<PValue, DataStream<?>> dataStreams;
-
- private AppliedPTransform<?, ?, ?> currentTransform;
-
- public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
- this.env = checkNotNull(env);
- this.options = checkNotNull(options);
- this.dataStreams = new HashMap<>();
- }
-
- public StreamExecutionEnvironment getExecutionEnvironment() {
- return env;
- }
-
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @SuppressWarnings("unchecked")
- public <T> DataStream<T> getInputDataStream(PValue value) {
- return (DataStream<T>) dataStreams.get(value);
- }
-
- public void setOutputDataStream(PValue value, DataStream<?> set) {
- if (!dataStreams.containsKey(value)) {
- dataStreams.put(value, set);
- }
- }
-
- /**
- * Sets the AppliedPTransform which carries input/output.
- * @param currentTransform
- */
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
- this.currentTransform = currentTransform;
- }
-
- public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
- Coder<T> valueCoder = collection.getCoder();
-
- return WindowedValue.getFullCoder(
- valueCoder,
- collection.getWindowingStrategy().getWindowFn().windowCoder());
- }
-
- @SuppressWarnings("unchecked")
- public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
- Coder<T> valueCoder = collection.getCoder();
- WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
- WindowedValue.getFullCoder(
- valueCoder,
- collection.getWindowingStrategy().getWindowFn().windowCoder());
-
- return new CoderTypeInformation<>(windowedValueCoder);
- }
-
-
- @SuppressWarnings("unchecked")
- public <T extends PValue> T getInput(PTransform<T, ?> transform) {
- return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
- }
-
- public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {
- return currentTransform.getInputs();
- }
-
- @SuppressWarnings("unchecked")
- public <T extends PValue> T getOutput(PTransform<?, T> transform) {
- return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
- }
-
- public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
- PTransform<?, OutputT> transform) {
- return currentTransform.getOutputs();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
deleted file mode 100644
index f955f2a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Flink streaming overrides for various view (side input) transforms.
- */
-class FlinkStreamingViewOverrides {
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Flink runner in streaming mode.
- */
- static class StreamingViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- private final transient FlinkRunner runner;
-
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * View.AsMultimap View.AsMultimap} for the
- * Flink runner in streaming mode.
- */
- static class StreamingViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- private final transient FlinkRunner runner;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsList View.AsList} for the
- * Flink runner in streaming mode.
- */
- static class StreamingViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsIterable View.AsIterable} for the
- * Flink runner in streaming mode.
- */
- static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized expansion for
- * {@link View.AsSingleton View.AsSingleton} for the
- * Flink runner in streaming mode.
- */
- static class StreamingViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private View.AsSingleton<T> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
- }
-
- static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingCombineGloballyAsSingletonView(
- FlinkRunner runner,
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined,
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<T>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateFlinkPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private PCollectionView<ViewT> view;
-
- private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateFlinkPCollectionView<>(view);
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
deleted file mode 100644
index 3acc3ea..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
- */
-class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
-
- private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
-
- private TranslationMode translationMode;
-
- private final FlinkPipelineOptions options;
-
- public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) {
- this.translationMode = defaultMode;
- this.options = options;
- }
-
- public TranslationMode getTranslationMode() {
-
- // override user-specified translation mode
- if (options.isStreaming()) {
- return TranslationMode.STREAMING;
- }
-
- return translationMode;
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {}
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- Class<? extends PTransform> transformClass = node.getTransform().getClass();
- if (transformClass == Read.Unbounded.class) {
- LOG.info("Found {}. Switching to streaming execution.", transformClass);
- translationMode = TranslationMode.STREAMING;
- }
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-}