You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:47 UTC
[09/17] incubator-beam git commit: Fix Checkstyle Errors in
FlinkStreamingTransformTranslators
Fix Checkstyle Errors in FlinkStreamingTransformTranslators
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff34f9e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff34f9e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff34f9e8
Branch: refs/heads/master
Commit: ff34f9e81867a656e4d9dd0987063c58cbb1de88
Parents: 1de76b7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Jul 10 17:01:38 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 155 ++++++++++++-------
1 file changed, 102 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff34f9e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index fff629c..8167623 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -106,7 +106,9 @@ public class FlinkStreamingTransformTranslators {
// --------------------------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
- private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+ private static final Map<
+ Class<? extends PTransform>,
+ FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
// here you can find all the available translators.
static {
@@ -125,7 +127,8 @@ public class FlinkStreamingTransformTranslators {
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
}
- public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
+ PTransform<?, ?> transform) {
return TRANSLATORS.get(transform.getClass());
}
@@ -133,21 +136,24 @@ public class FlinkStreamingTransformTranslators {
// Transformation Implementations
// --------------------------------------------------------------------------------------------
- private static class CreateStreamingTranslator<OUT> implements
- FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
+ private static class CreateStreamingTranslator<OutputT> implements
+ FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OutputT>> {
@Override
- public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) {
- PCollection<OUT> output = context.getOutput(transform);
- Iterable<OUT> elements = transform.getElements();
+ public void translateNode(
+ Create.Values<OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<OutputT> output = context.getOutput(transform);
+ Iterable<OutputT> elements = transform.getElements();
// we need to serialize the elements to byte arrays, since they might contain
// elements that are not serializable by Java serialization. We deserialize them
// in the FlatMap function using the Coder.
List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OUT> elementCoder = output.getCoder();
- for (OUT element: elements) {
+ Coder<OutputT> elementCoder = output.getCoder();
+ for (OutputT element: elements) {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
try {
elementCoder.encode(element, bao, Coder.Context.OUTER);
@@ -160,25 +166,33 @@ public class FlinkStreamingTransformTranslators {
DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
- FlinkStreamingCreateFunction<Integer, OUT> createFunction =
+ FlinkStreamingCreateFunction<Integer, OutputT> createFunction =
new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
- WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder);
- TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder);
+ WindowedValue.ValueOnlyWindowedValueCoder<OutputT> windowCoder =
+ WindowedValue.getValueOnlyCoder(elementCoder);
+
+ TypeInformation<WindowedValue<OutputT>> outputType = new CoderTypeInformation<>(windowCoder);
- DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
- .returns(outputType);
+ DataStream<WindowedValue<OutputT>> outputDataStream = initDataSet
+ .flatMap(createFunction).returns(outputType);
context.setOutputDataStream(output, outputDataStream);
}
}
- private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+ private static class TextIOWriteBoundStreamingTranslator<T>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ TextIO.Write.Bound<T>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
@Override
- public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(
+ TextIO.Write.Bound<T> transform,
+ FlinkStreamingTranslationContext context) {
PValue input = context.getInput(transform);
DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
@@ -189,17 +203,25 @@ public class FlinkStreamingTransformTranslators {
String shardNameTemplate = transform.getShardNameTemplate();
// TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
-
- DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
- @Override
- public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
- out.collect(value.getValue().toString());
- }
- });
- DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+ LOG.warn(
+ "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+ needsValidation);
+ LOG.warn(
+ "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+ filenameSuffix);
+ LOG.warn(
+ "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+ shardNameTemplate);
+
+ DataStream<String> dataSink = inputDataStream
+ .flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
+ @Override
+ public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
+ out.collect(value.getValue().toString());
+ }
+ });
+ DataStreamSink<String> output =
+ dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
if (numShards > 0) {
output.setParallelism(numShards);
@@ -207,7 +229,8 @@ public class FlinkStreamingTransformTranslators {
}
}
- private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+ private static class WriteSinkStreamingTranslator<T>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
@Override
public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
@@ -216,7 +239,8 @@ public class FlinkStreamingTransformTranslators {
Sink<T> sink = transform.getSink();
if (!(sink instanceof UnboundedFlinkSink)) {
- throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported.");
+ throw new UnsupportedOperationException(
+ "At the time, only unbounded Flink sinks are supported.");
}
DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
@@ -251,16 +275,21 @@ public class FlinkStreamingTransformTranslators {
}
}
- private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+ private static class UnboundedReadSourceTranslator<T>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
@Override
- public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(
+ Read.Unbounded<T> transform,
+ FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
DataStream<WindowedValue<T>> source;
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
@SuppressWarnings("unchecked")
- UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource();
+ UnboundedFlinkSource<T> flinkSourceFunction =
+ (UnboundedFlinkSource<T>) transform.getSource();
+
final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
DataStream<T> flinkSource = context.getExecutionEnvironment()
@@ -290,9 +319,12 @@ public class FlinkStreamingTransformTranslators {
context.getPipelineOptions(),
transform.getSource(),
context.getExecutionEnvironment().getParallelism());
- source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName());
+ source = context
+ .getExecutionEnvironment()
+ .addSource(sourceWrapper).name(transform.getName());
} catch (Exception e) {
- throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e);
+ throw new RuntimeException(
+ "Error while translating UnboundedSource: " + transform.getSource(), e);
}
}
@@ -300,33 +332,41 @@ public class FlinkStreamingTransformTranslators {
}
}
- private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> {
+ private static class ParDoBoundStreamingTranslator<InputT, OutputT>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ ParDo.Bound<InputT, OutputT>> {
@Override
- public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(
+ ParDo.Bound<InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
- WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy();
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getOutput(transform).getWindowingStrategy();
- TypeInformation<WindowedValue<OUT>> typeInfo = context.getTypeInfo(context.getOutput(transform));
+ TypeInformation<WindowedValue<OutputT>> typeInfo =
+ context.getTypeInfo(context.getOutput(transform));
- DoFnOperator<IN, OUT, WindowedValue<OUT>> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>(
transform.getFn(),
- new TupleTag<OUT>("main output"),
+ new TupleTag<OutputT>("main output"),
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OUT>>(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
windowingStrategy,
new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
context.getPipelineOptions());
- DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream
- .transform(transform.getName(), typeInfo, doFnOperator);
+ DataStream<WindowedValue<InputT>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream =
+ inputDataStream.transform(transform.getName(), typeInfo, doFnOperator);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
}
}
- public static class WindowBoundTranslator<T>
+ private static class WindowBoundTranslator<T>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
@Override
@@ -393,11 +433,13 @@ public class FlinkStreamingTransformTranslators {
}
}
- public static class GroupByKeyTranslator<K, InputT>
+ private static class GroupByKeyTranslator<K, InputT>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
@Override
- public void translateNode(GroupByKey<K, InputT> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(
+ GroupByKey<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
PCollection<KV<K, InputT>> input = context.getInput(transform);
@@ -471,7 +513,7 @@ public class FlinkStreamingTransformTranslators {
}
}
- public static class CombinePerKeyTranslator<K, InputT, OutputT>
+ private static class CombinePerKeyTranslator<K, InputT, OutputT>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
Combine.PerKey<K, InputT, OutputT>> {
@@ -566,10 +608,14 @@ public class FlinkStreamingTransformTranslators {
}
}
- public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ private static class FlattenPCollectionTranslator<T>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ Flatten.FlattenPCollectionList<T>> {
@Override
- public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(
+ Flatten.FlattenPCollectionList<T> transform,
+ FlinkStreamingTranslationContext context) {
List<PCollection<T>> allInputs = context.getInput(transform).getAll();
DataStream<T> result = null;
for (PCollection<T> collection : allInputs) {
@@ -580,7 +626,7 @@ public class FlinkStreamingTransformTranslators {
}
}
- public static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+ private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
ParDo.BoundMulti<InputT, OutputT>> {
@@ -635,7 +681,10 @@ public class FlinkStreamingTransformTranslators {
}
}
- private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) {
+ private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+ TupleTag<?> mainTag,
+ Set<TupleTag<?>> secondaryTags) {
+
Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
int count = 0;
tagToLabelMap.put(mainTag, count++);