You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/20 16:01:45 UTC
[1/3] beam git commit: Exclude
UsesSplittableParDoWithWindowedSideInputs in Flink Stream Runner Tests
Repository: beam
Updated Branches:
refs/heads/master 4121ec490 -> 56398fcba
Exclude UsesSplittableParDoWithWindowedSideInputs in Flink Stream Runner Tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/252ebf15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/252ebf15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/252ebf15
Branch: refs/heads/master
Commit: 252ebf15e002481f3c2648035e04dd3f4b57832d
Parents: 0c030d4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 16:27:51 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:19 2017 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/252ebf15/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 808219b..6e1d3c5 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -94,7 +94,8 @@
org.apache.beam.sdk.testing.UsesMapState,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
- org.apache.beam.sdk.testing.UsesTestStream
+ org.apache.beam.sdk.testing.UsesTestStream,
+ org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
[3/3] beam git commit: This closes #2537
Posted by al...@apache.org.
This closes #2537
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56398fcb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56398fcb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56398fcb
Branch: refs/heads/master
Commit: 56398fcba247c3e5416dc3d3f41c73193856fddc
Parents: 4121ec4 252ebf1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 18:01:28 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:28 2017 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 3 +-
.../FlinkStreamingTransformTranslators.java | 55 --------------------
.../flink/streaming/GroupByNullKeyTest.java | 11 ++--
3 files changed, 10 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-1886] Remove TextIO override in Flink
runner
Posted by al...@apache.org.
[BEAM-1886] Remove TextIO override in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c030d49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c030d49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c030d49
Branch: refs/heads/master
Commit: 0c030d49a79ce20e25a8af3110442130dab8899c
Parents: 4121ec4
Author: JingsongLi <lz...@aliyun.com>
Authored: Fri Apr 14 11:22:43 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 20 18:01:19 2017 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 55 --------------------
.../flink/streaming/GroupByNullKeyTest.java | 11 ++--
2 files changed, 8 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 123d5e7..71f315d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -80,10 +79,8 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -92,8 +89,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class contains all the mappings between Beam and Flink
@@ -116,7 +111,6 @@ class FlinkStreamingTransformTranslators {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
TRANSLATORS.put(
@@ -145,55 +139,6 @@ class FlinkStreamingTransformTranslators {
// Transformation Implementations
// --------------------------------------------------------------------------------------------
- private static class TextIOWriteBoundStreamingTranslator
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
- @Override
- public void translateNode(
- TextIO.Write.Bound transform,
- FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
- DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- boolean needsValidation = transform.needsValidation();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn(
- "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
- needsValidation);
- LOG.warn(
- "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
- filenameSuffix);
- LOG.warn(
- "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
- shardNameTemplate);
-
- DataStream<String> dataSink = inputDataStream
- .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
- @Override
- public void flatMap(
- WindowedValue<String> value,
- Collector<String> out)
- throws Exception {
- out.collect(value.getValue());
- }
- });
- DataStreamSink<String> output =
- dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
- if (numShards > 0) {
- output.setParallelism(numShards);
- }
- }
- }
-
private static class UnboundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 663b910..82d9f4f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink.streaming;
import com.google.common.base.Joiner;
+import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.runners.flink.FlinkTestPipeline;
@@ -41,7 +42,7 @@ import org.joda.time.Instant;
*/
public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
-
+ protected String resultDir;
protected String resultPath;
static final String[] EXPECTED_RESULT = new String[] {
@@ -53,12 +54,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
@Override
protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+ // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
+ // So tempFile need have a parent to compare.
+ File resultParent = createAndRegisterTempFile("result");
+ resultDir = resultParent.toURI().toString();
+ resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
}
/**