You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/19 15:36:06 UTC
[1/2] flink git commit: [FLINK-2286] [streaming] Wrapped
ParallelMerge into stream operator
Repository: flink
Updated Branches:
refs/heads/release-0.9 e8802f90a -> c15831731
[FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator
Closes #994
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87119749
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87119749
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87119749
Branch: refs/heads/release-0.9
Commit: 87119749aa2e6b129d1099b2ede9ad5854b33a55
Parents: e8802f9
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Aug 6 15:42:54 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 18 15:36:31 2015 +0200
----------------------------------------------------------------------
.../api/datastream/DiscretizedStream.java | 8 ++--
.../api/operators/co/CoStreamFlatMap.java | 4 ++
.../api/operators/windowing/ParallelMerge.java | 3 ++
.../windowing/ParallelMergeOperator.java | 43 ++++++++++++++++++++
4 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index ba28fa4..e35592e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -30,11 +30,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
@@ -45,6 +43,7 @@ import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
+import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
@@ -147,7 +146,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
ReduceFunction<OUT> reduceFunction) {
- CoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
+ ParallelMerge<OUT> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
: new ParallelMerge<OUT>(reduceFunction);
return reduced.discretizedStream
@@ -156,8 +155,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
.transform(
"CoFlatMap",
reduced.discretizedStream.getType(),
- new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
- parallelMerger));
+ new ParallelMergeOperator<OUT>(parallelMerger));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index e3662d6..777b165 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -41,4 +41,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
public void processElement2(IN2 element) throws Exception {
userFunction.flatMap2(element, output);
}
+
+ protected TimestampedCollector<OUT> getCollector() {
+ return collector;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
index cd239fc..ce7d887 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
@@ -139,4 +139,7 @@ public class ParallelMerge<OUT> extends
this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
}
+ Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> getReceivedWindows() {
+ return receivedWindows;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
new file mode 100644
index 0000000..74df3ad
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public class ParallelMergeOperator<OUT> extends CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
+
+ private ParallelMerge<OUT> parallelMerge;
+
+ public ParallelMergeOperator(ParallelMerge<OUT> parallelMerge) {
+ super(parallelMerge);
+ this.parallelMerge = parallelMerge;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // emit remaining (partial) windows
+
+ for (Tuple2<StreamWindow<OUT>, Integer> receivedWindow : parallelMerge.getReceivedWindows().values()) {
+ getCollector().collect(receivedWindow.f0);
+ }
+
+ super.close();
+ }
+}
[2/2] flink git commit: [FLINK-2286] [streaming] ITCase for
ParallelMerge behavior
Posted by mb...@apache.org.
[FLINK-2286] [streaming] ITCase for ParallelMerge behavior
Closes #1014
Adapted for version 0.9
Conflicts:
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1583173
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1583173
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1583173
Branch: refs/heads/release-0.9
Commit: c15831731fae78681870385a49f7bd70e54f67c2
Parents: 8711974
Author: mbalassi <mb...@apache.org>
Authored: Tue Aug 11 15:51:30 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Aug 19 14:07:22 2015 +0200
----------------------------------------------------------------------
.../api/operators/co/CoStreamFlatMap.java | 5 +-
.../windowing/ParallelMergeITCase.java | 101 +++++++++++++++++++
.../scala/table/test/PageRankTableITCase.java | 2 +-
.../flink/tez/test/PageRankBasicStepITCase.java | 2 +-
.../apache/flink/test/util/TestBaseUtils.java | 11 +-
.../exampleJavaPrograms/PageRankITCase.java | 2 +-
.../exampleScalaPrograms/PageRankITCase.java | 2 +-
7 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 777b165..67852de 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.util.Collector;
public class CoStreamFlatMap<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
@@ -42,7 +43,7 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
userFunction.flatMap2(element, output);
}
- protected TimestampedCollector<OUT> getCollector() {
- return collector;
+ protected Collector<OUT> getCollector() {
+ return output;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
new file mode 100644
index 0000000..e1c36a2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests that {@link ParallelMerge} does not swallow records of the
+ * last window.
+ */
+public class ParallelMergeITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+ protected final String input = "To be, or not to be,--that is the question:--" +
+ "Whether 'tis nobler in the mind to suffer";
+
+ @Override
+ protected void preSubmit() throws Exception {
+ textPath = createTempFile("text.txt", input);
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ List<String> resultLines = new ArrayList<String>();
+ readAllResultLines(resultLines, resultPath);
+
+ // check that result lines are not swallowed, as every element is expected to be in the
+ // last time window we either get the right output or no output at all
+ if (resultLines.isEmpty()){
+ Assert.fail();
+ }
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> text = env.fromElements(input);
+
+ DataStream<Tuple2<String, Integer>> counts =
+ text.flatMap(new Tokenizer())
+ .window(Time.of(1000, TimeUnit.MILLISECONDS))
+ .groupBy(0)
+ .sum(1)
+ .flatten();
+
+ counts.writeAsText(resultPath);
+
+ try {
+ env.execute();
+ } catch (RuntimeException e){
+ // might happen at closing the active window
+ // do nothing
+ }
+ }
+
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
index 5353b73..1816614 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
@@ -61,7 +61,7 @@ public class PageRankTableITCase extends JavaProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+ compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.01);
}
@Parameters
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
index 511c2cb..9a203fe 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
@@ -49,6 +49,6 @@ public class PageRankBasicStepITCase extends TezProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.001);
+ compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 92d66d9..acf8cf8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -308,16 +308,15 @@ public class TestBaseUtils {
Assert.fail(msg);
}
}
-
}
- public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
- String delimiter, double maxDelta) throws Exception {
- compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+ public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+ String delimiter, double maxDelta) throws Exception {
+ compareKeyValuePairsWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
}
- public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
- String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
+ public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+ String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
index 1c66c3e..2d1519d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
@@ -63,7 +63,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
@After
public void after() throws Exception{
- compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+ compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c1583173/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index f9c2566..6b9e550 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -67,7 +67,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
@After
public void after() throws Exception{
- compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+ compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
}
@Test