You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/18 15:35:43 UTC

[1/3] flink git commit: [flink-2532] [streaming] fix variable name in StreamWindow

Repository: flink
Updated Branches:
  refs/heads/master 2f0412f16 -> a9d55d3ea


[flink-2532] [streaming] fix variable name in StreamWindow

Closes #1025


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9d55d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9d55d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9d55d3e

Branch: refs/heads/master
Commit: a9d55d3ea7a876510cbb643ff62d777d438291b9
Parents: 67087de
Author: Rucongzhang <zh...@huawei.com>
Authored: Sun Aug 16 16:44:00 2015 +0800
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 18 15:34:18 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/windowing/StreamWindow.java       | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9d55d3e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index ee2ea06..5a63940 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -153,24 +153,24 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		if (n > numElements) {
 			return split(window, numElements);
 		} else {
-			List<StreamWindow<X>> split = new ArrayList<StreamWindow<X>>();
+			List<StreamWindow<X>> splitsList = new ArrayList<StreamWindow<X>>();
 			int splitSize = numElements / n;
 
 			int index = -1;
 
 			StreamWindow<X> currentSubWindow = new StreamWindow<X>(window.windowID, n);
-			split.add(currentSubWindow);
+			splitsList.add(currentSubWindow);
 
 			for (X element : window) {
 				index++;
-				if (index == splitSize && split.size() < n) {
+				if (index == splitSize && splitsList.size() < n) {
 					currentSubWindow = new StreamWindow<X>(window.windowID, n);
-					split.add(currentSubWindow);
+					splitsList.add(currentSubWindow);
 					index = 0;
 				}
 				currentSubWindow.add(element);
 			}
-			return split;
+			return splitsList;
 		}
 	}
 


[3/3] flink git commit: [FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator

Posted by mb...@apache.org.
[FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator

Closes #994


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/143ec4f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/143ec4f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/143ec4f3

Branch: refs/heads/master
Commit: 143ec4f3132974a2ed3ea8683964a65bfe288a59
Parents: 2f0412f
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Aug 6 15:42:54 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 18 15:34:18 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       |  8 ++--
 .../api/operators/co/CoStreamFlatMap.java       |  4 ++
 .../api/operators/windowing/ParallelMerge.java  |  3 ++
 .../windowing/ParallelMergeOperator.java        | 43 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/143ec4f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index ba28fa4..e35592e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -30,11 +30,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamFilter;
 import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
 import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
 import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
@@ -45,6 +43,7 @@ import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
 import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
 import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
 import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
+import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
@@ -147,7 +146,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 			DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
 			ReduceFunction<OUT> reduceFunction) {
 
-		CoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
+		ParallelMerge<OUT> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
 				: new ParallelMerge<OUT>(reduceFunction);
 
 		return reduced.discretizedStream
@@ -156,8 +155,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 				.transform(
 						"CoFlatMap",
 						reduced.discretizedStream.getType(),
-						new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
-								parallelMerger));
+						new ParallelMergeOperator<OUT>(parallelMerger));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/143ec4f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index d2bd107..1448ab8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -81,4 +81,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 			output.emitWatermark(new Watermark(combinedWatermark));
 		}
 	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/143ec4f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
index cd239fc..ce7d887 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
@@ -139,4 +139,7 @@ public class ParallelMerge<OUT> extends
 		this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
 	}
 
+	Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> getReceivedWindows() {
+		return receivedWindows;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/143ec4f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
new file mode 100644
index 0000000..74df3ad
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public class ParallelMergeOperator<OUT> extends CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
+
+	private ParallelMerge<OUT> parallelMerge;
+
+	public ParallelMergeOperator(ParallelMerge<OUT> parallelMerge) {
+		super(parallelMerge);
+		this.parallelMerge = parallelMerge;
+	}
+
+	@Override
+	public void close() throws Exception {
+		// emit remaining (partial) windows
+
+		for (Tuple2<StreamWindow<OUT>, Integer> receivedWindow : parallelMerge.getReceivedWindows().values()) {
+			getCollector().collect(receivedWindow.f0);
+		}
+
+		super.close();
+	}
+}


[2/3] flink git commit: [FLINK-2286] [streaming] ITCase for ParallelMerge behavior

Posted by mb...@apache.org.
[FLINK-2286] [streaming] ITCase for ParallelMerge behavior

Closes #1014


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67087dea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67087dea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67087dea

Branch: refs/heads/master
Commit: 67087dea698881b365debe17f847af0f192ee5cb
Parents: 143ec4f
Author: mbalassi <mb...@apache.org>
Authored: Tue Aug 11 15:51:30 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 18 15:34:18 2015 +0200

----------------------------------------------------------------------
 .../graph/test/example/PageRankITCase.java      |   2 +-
 .../windowing/ParallelMergeITCase.java          | 101 +++++++++++++++++++
 .../scala/table/test/PageRankTableITCase.java   |   2 +-
 .../flink/tez/test/PageRankBasicStepITCase.java |   2 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  11 +-
 .../exampleJavaPrograms/PageRankITCase.java     |   2 +-
 .../exampleScalaPrograms/PageRankITCase.java    |   2 +-
 7 files changed, 111 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
index 544cc66..cde959f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
@@ -61,7 +61,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 	@After
 	public void after() throws Exception{
-		compareKeyValueParisWithDelta(expected, resultPath, "\t", 0.01);
+		compareKeyValuePairsWithDelta(expected, resultPath, "\t", 0.01);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
new file mode 100644
index 0000000..b762d65
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests that {@link ParallelMerge} does not swallow records of the
+ * last window.
+ */
+public class ParallelMergeITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+	protected final String input = "To be, or not to be,--that is the question:--" +
+									"Whether 'tis nobler in the mind to suffer";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", input);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		List<String> resultLines = new ArrayList<>();
+		readAllResultLines(resultLines, resultPath);
+
+		// check that result lines are not swallowed, as every element is expected to be in the
+		// last time window we either get the right output or no output at all
+		if (resultLines.isEmpty()){
+			Assert.fail();
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(input);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				text.flatMap(new Tokenizer())
+						.window(Time.of(1000, TimeUnit.MILLISECONDS))
+						.groupBy(0)
+						.sum(1)
+						.flatten();
+
+		counts.writeAsText(resultPath);
+
+		try {
+			env.execute();
+		} catch (RuntimeException e){
+			// might happen at closing the active window
+			// do nothing
+		}
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(Tuple2.of(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
index 5353b73..1816614 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
@@ -61,7 +61,7 @@ public class PageRankTableITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+		compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.01);
 	}
 
 	@Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
index 511c2cb..9a203fe 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
@@ -49,6 +49,6 @@ public class PageRankBasicStepITCase extends TezProgramTestBase {
 
     @Override
     protected void postSubmit() throws Exception {
-        compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.001);
+        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 9068fcc..c28347c 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -321,16 +321,15 @@ public class TestBaseUtils extends TestLogger {
 				Assert.fail(msg);
 			}
 		}
-
 	}
 
-	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
-											String delimiter, double maxDelta) throws Exception {
-		compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+	public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+														String delimiter, double maxDelta) throws Exception {
+		compareKeyValuePairsWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
 	}
 
-	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
-											String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
+	public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+														String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
index 1c66c3e..2d1519d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
@@ -63,7 +63,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 	@After
 	public void after() throws Exception{
-		compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+		compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/67087dea/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index f9c2566..6b9e550 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -67,7 +67,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 	@After
 	public void after() throws Exception{
-		compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+		compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
 	}
 
 	@Test