You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:47 UTC

[09/17] incubator-beam git commit: Fix Checkstyle Errors in FlinkStreamingTransformTranslators

Fix Checkstyle Errors in FlinkStreamingTransformTranslators


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff34f9e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff34f9e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff34f9e8

Branch: refs/heads/master
Commit: ff34f9e81867a656e4d9dd0987063c58cbb1de88
Parents: 1de76b7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Jul 10 17:01:38 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 155 ++++++++++++-------
 1 file changed, 102 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff34f9e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index fff629c..8167623 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -106,7 +106,9 @@ public class FlinkStreamingTransformTranslators {
   // --------------------------------------------------------------------------------------------
 
   @SuppressWarnings("rawtypes")
-  private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+  private static final Map<
+      Class<? extends PTransform>,
+      FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
 
   // here you can find all the available translators.
   static {
@@ -125,7 +127,8 @@ public class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
   }
 
-  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
+      PTransform<?, ?> transform) {
     return TRANSLATORS.get(transform.getClass());
   }
 
@@ -133,21 +136,24 @@ public class FlinkStreamingTransformTranslators {
   //  Transformation Implementations
   // --------------------------------------------------------------------------------------------
 
-  private static class CreateStreamingTranslator<OUT> implements
-      FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
+  private static class CreateStreamingTranslator<OutputT> implements
+      FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OutputT>> {
 
     @Override
-    public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) {
-      PCollection<OUT> output = context.getOutput(transform);
-      Iterable<OUT> elements = transform.getElements();
+    public void translateNode(
+        Create.Values<OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<OutputT> output = context.getOutput(transform);
+      Iterable<OutputT> elements = transform.getElements();
 
       // we need to serialize the elements to byte arrays, since they might contain
       // elements that are not serializable by Java serialization. We deserialize them
       // in the FlatMap function using the Coder.
 
       List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OUT> elementCoder = output.getCoder();
-      for (OUT element: elements) {
+      Coder<OutputT> elementCoder = output.getCoder();
+      for (OutputT element: elements) {
         ByteArrayOutputStream bao = new ByteArrayOutputStream();
         try {
           elementCoder.encode(element, bao, Coder.Context.OUTER);
@@ -160,25 +166,33 @@ public class FlinkStreamingTransformTranslators {
 
       DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
 
-      FlinkStreamingCreateFunction<Integer, OUT> createFunction =
+      FlinkStreamingCreateFunction<Integer, OutputT> createFunction =
           new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
 
-      WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder);
-      TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder);
+      WindowedValue.ValueOnlyWindowedValueCoder<OutputT> windowCoder =
+          WindowedValue.getValueOnlyCoder(elementCoder);
+
+      TypeInformation<WindowedValue<OutputT>> outputType = new CoderTypeInformation<>(windowCoder);
 
-      DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
-          .returns(outputType);
+      DataStream<WindowedValue<OutputT>> outputDataStream = initDataSet
+          .flatMap(createFunction).returns(outputType);
 
       context.setOutputDataStream(output, outputDataStream);
     }
   }
 
 
-  private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> {
-    private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+  private static class TextIOWriteBoundStreamingTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        TextIO.Write.Bound<T>> {
+
+    private static final Logger LOG =
+        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
 
     @Override
-    public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        TextIO.Write.Bound<T> transform,
+        FlinkStreamingTranslationContext context) {
       PValue input = context.getInput(transform);
       DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
 
@@ -189,17 +203,25 @@ public class FlinkStreamingTransformTranslators {
       String shardNameTemplate = transform.getShardNameTemplate();
 
       // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
-
-      DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
-        @Override
-        public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
-          out.collect(value.getValue().toString());
-        }
-      });
-      DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+      LOG.warn(
+          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+          needsValidation);
+      LOG.warn(
+          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+          filenameSuffix);
+      LOG.warn(
+          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+          shardNameTemplate);
+
+      DataStream<String> dataSink = inputDataStream
+          .flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
+            @Override
+            public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
+              out.collect(value.getValue().toString());
+            }
+          });
+      DataStreamSink<String> output =
+          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
 
       if (numShards > 0) {
         output.setParallelism(numShards);
@@ -207,7 +229,8 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+  private static class WriteSinkStreamingTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
 
     @Override
     public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
@@ -216,7 +239,8 @@ public class FlinkStreamingTransformTranslators {
 
       Sink<T> sink = transform.getSink();
       if (!(sink instanceof UnboundedFlinkSink)) {
-        throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported.");
+        throw new UnsupportedOperationException(
+            "At the time, only unbounded Flink sinks are supported.");
       }
 
       DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
@@ -251,16 +275,21 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+  private static class UnboundedReadSourceTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
 
     @Override
-    public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        Read.Unbounded<T> transform,
+        FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
       DataStream<WindowedValue<T>> source;
       if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
         @SuppressWarnings("unchecked")
-        UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource();
+        UnboundedFlinkSource<T> flinkSourceFunction =
+            (UnboundedFlinkSource<T>) transform.getSource();
+
         final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
 
         DataStream<T> flinkSource = context.getExecutionEnvironment()
@@ -290,9 +319,12 @@ public class FlinkStreamingTransformTranslators {
                   context.getPipelineOptions(),
                   transform.getSource(),
                   context.getExecutionEnvironment().getParallelism());
-          source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName());
+          source = context
+              .getExecutionEnvironment()
+              .addSource(sourceWrapper).name(transform.getName());
         } catch (Exception e) {
-          throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e);
+          throw new RuntimeException(
+              "Error while translating UnboundedSource: " + transform.getSource(), e);
         }
       }
 
@@ -300,33 +332,41 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> {
+  private static class ParDoBoundStreamingTranslator<InputT, OutputT>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        ParDo.Bound<InputT, OutputT>> {
 
     @Override
-    public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        ParDo.Bound<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
 
-      WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy();
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getOutput(transform).getWindowingStrategy();
 
-      TypeInformation<WindowedValue<OUT>> typeInfo = context.getTypeInfo(context.getOutput(transform));
+      TypeInformation<WindowedValue<OutputT>> typeInfo =
+          context.getTypeInfo(context.getOutput(transform));
 
-      DoFnOperator<IN, OUT, WindowedValue<OUT>> doFnOperator = new DoFnOperator<>(
+      DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>(
           transform.getFn(),
-          new TupleTag<OUT>("main output"),
+          new TupleTag<OutputT>("main output"),
           Collections.<TupleTag<?>>emptyList(),
-          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OUT>>(),
+          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
           windowingStrategy,
           new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
           context.getPipelineOptions());
 
-      DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
-      SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream
-          .transform(transform.getName(), typeInfo, doFnOperator);
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+
+      SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream =
+          inputDataStream.transform(transform.getName(), typeInfo, doFnOperator);
 
       context.setOutputDataStream(context.getOutput(transform), outDataStream);
     }
   }
 
-  public static class WindowBoundTranslator<T>
+  private static class WindowBoundTranslator<T>
       implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
 
     @Override
@@ -393,11 +433,13 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class GroupByKeyTranslator<K, InputT>
+  private static class GroupByKeyTranslator<K, InputT>
       implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
 
     @Override
-    public void translateNode(GroupByKey<K, InputT> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        GroupByKey<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
 
       PCollection<KV<K, InputT>> input = context.getInput(transform);
 
@@ -471,7 +513,7 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class CombinePerKeyTranslator<K, InputT, OutputT>
+  private static class CombinePerKeyTranslator<K, InputT, OutputT>
       implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         Combine.PerKey<K, InputT, OutputT>> {
 
@@ -566,10 +608,14 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+  private static class FlattenPCollectionTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        Flatten.FlattenPCollectionList<T>> {
 
     @Override
-    public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        Flatten.FlattenPCollectionList<T> transform,
+        FlinkStreamingTranslationContext context) {
       List<PCollection<T>> allInputs = context.getInput(transform).getAll();
       DataStream<T> result = null;
       for (PCollection<T> collection : allInputs) {
@@ -580,7 +626,7 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+  private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
       implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         ParDo.BoundMulti<InputT, OutputT>> {
 
@@ -635,7 +681,10 @@ public class FlinkStreamingTransformTranslators {
       }
     }
 
-    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) {
+    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+        TupleTag<?> mainTag,
+        Set<TupleTag<?>> secondaryTags) {
+
       Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
       int count = 0;
       tagToLabelMap.put(mainTag, count++);