You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/20 16:01:45 UTC

[1/3] beam git commit: Exclude UsesSplittableParDoWithWindowedSideInputs in Flink Stream Runner Tests

Repository: beam
Updated Branches:
  refs/heads/master 4121ec490 -> 56398fcba


Exclude UsesSplittableParDoWithWindowedSideInputs in Flink Stream Runner Tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/252ebf15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/252ebf15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/252ebf15

Branch: refs/heads/master
Commit: 252ebf15e002481f3c2648035e04dd3f4b57832d
Parents: 0c030d4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 16:27:51 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:19 2017 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/252ebf15/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 808219b..6e1d3c5 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -94,7 +94,8 @@
                     org.apache.beam.sdk.testing.UsesMapState,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream
+                    org.apache.beam.sdk.testing.UsesTestStream,
+                    org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>


[3/3] beam git commit: This closes #2537

Posted by al...@apache.org.
This closes #2537


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56398fcb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56398fcb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56398fcb

Branch: refs/heads/master
Commit: 56398fcba247c3e5416dc3d3f41c73193856fddc
Parents: 4121ec4 252ebf1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 18:01:28 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:28 2017 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  3 +-
 .../FlinkStreamingTransformTranslators.java     | 55 --------------------
 .../flink/streaming/GroupByNullKeyTest.java     | 11 ++--
 3 files changed, 10 insertions(+), 59 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: [BEAM-1886] Remove TextIO override in Flink runner

Posted by al...@apache.org.
[BEAM-1886] Remove TextIO override in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c030d49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c030d49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c030d49

Branch: refs/heads/master
Commit: 0c030d49a79ce20e25a8af3110442130dab8899c
Parents: 4121ec4
Author: JingsongLi <lz...@aliyun.com>
Authored: Fri Apr 14 11:22:43 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:19 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 55 --------------------
 .../flink/streaming/GroupByNullKeyTest.java     | 11 ++--
 2 files changed, 8 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 123d5e7..71f315d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -80,10 +79,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -92,8 +89,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class contains all the mappings between Beam and Flink
@@ -116,7 +111,6 @@ class FlinkStreamingTransformTranslators {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
     TRANSLATORS.put(
@@ -145,55 +139,6 @@ class FlinkStreamingTransformTranslators {
   //  Transformation Implementations
   // --------------------------------------------------------------------------------------------
 
-  private static class TextIOWriteBoundStreamingTranslator
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
-
-    private static final Logger LOG =
-        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
-    @Override
-    public void translateNode(
-        TextIO.Write.Bound transform,
-        FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn(
-          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
-          needsValidation);
-      LOG.warn(
-          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
-          filenameSuffix);
-      LOG.warn(
-          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
-          shardNameTemplate);
-
-      DataStream<String> dataSink = inputDataStream
-          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
-            @Override
-            public void flatMap(
-                WindowedValue<String> value,
-                Collector<String> out)
-                throws Exception {
-              out.collect(value.getValue());
-            }
-          });
-      DataStreamSink<String> output =
-          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
-      if (numShards > 0) {
-        output.setParallelism(numShards);
-      }
-    }
-  }
-
   private static class UnboundedReadSourceTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 663b910..82d9f4f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.streaming;
 
 import com.google.common.base.Joiner;
+import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
 import org.apache.beam.runners.flink.FlinkTestPipeline;
@@ -41,7 +42,7 @@ import org.joda.time.Instant;
  */
 public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
 
-
+  protected String resultDir;
   protected String resultPath;
 
   static final String[] EXPECTED_RESULT = new String[] {
@@ -53,12 +54,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
 
   @Override
   protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
+    // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
+    // So tempFile need have a parent to compare.
+    File resultParent = createAndRegisterTempFile("result");
+    resultDir = resultParent.toURI().toString();
+    resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
 
   @Override
   protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
   }
 
   /**