You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/14 16:40:16 UTC

incubator-flink git commit: [FLINK-1324] [runtime] Trailing data is cached before the local input strategies are closed.

Repository: incubator-flink
Updated Branches:
  refs/heads/master e47365bc0 -> 4cc6bb1db


[FLINK-1324] [runtime] Trailing data is cached before the local input strategies are closed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4cc6bb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4cc6bb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4cc6bb1d

Branch: refs/heads/master
Commit: 4cc6bb1db8390d8339b585171e1cac63a1903c8f
Parents: e47365b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 12 14:31:52 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 15:59:04 2014 +0100

----------------------------------------------------------------------
 .../runtime/operators/RegularPactTask.java      | 13 ++-
 .../CompensatableDotProductCoGroup.java         |  1 +
 .../CompensatableDotProductMatch.java           |  1 +
 .../danglingpagerank/CompensatingMap.java       |  1 +
 ...nIncompleteDynamicPathConsumptionITCase.java | 92 ++++++++++++++++++++
 ...onIncompleteStaticPathConsumptionITCase.java | 92 ++++++++++++++++++++
 6 files changed, 198 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 2a598ca..5012a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -855,8 +855,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 		for (int i = 0; i < numInputs; i++) {
 			final int memoryPages;
-			final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
 			final boolean cached =  this.config.isInputCached(i);
+			final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
 
 			this.inputIsAsyncMaterialized[i] = async;
 			this.inputIsCached[i] = cached;
@@ -888,6 +888,16 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	protected void resetAllInputs() throws Exception {
+		
+		// first we need to make sure that caches consume remaining data
+		// NOTE: we need to do this before closing the local strategies
+		for (int i = 0; i < this.inputs.length; i++) {
+			
+			if (this.inputIsCached[i] && this.resettableInputs[i] != null) {
+				this.resettableInputs[i].consumeAndCacheRemainingData();
+			}
+		}
+		
 		// close all local-strategies. they will either get re-initialized, or we have
 		// read them now and their data is cached
 		for (int i = 0; i < this.localStrategies.length; i++) {
@@ -918,7 +928,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 					if (this.tempBarriers[i] != null) {
 						this.inputs[i] = this.tempBarriers[i].getIterator();
 					} else if (this.resettableInputs[i] != null) {
-						this.resettableInputs[i].consumeAndCacheRemainingData();
 						this.resettableInputs[i].reset();
 						this.inputs[i] = this.resettableInputs[i];
 					} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
index b1d572a..bd1eeb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
@@ -29,6 +29,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class CompensatableDotProductCoGroup extends CoGroupFunction {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
index 45e517f..0508886 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class CompensatableDotProductMatch extends JoinFunction {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
index f8bc798..d8189ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
@@ -28,6 +28,7 @@ import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class CompensatingMap extends MapFunction {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
new file mode 100644
index 0000000..e924a88
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// the test data is constructed such that the merge join zig zag
+		// has an early out, leaving elements on the dynamic path input unconsumed
+		
+		DataSet<Path> edges = env.fromElements(
+				new Path(1, 2),
+				new Path(1, 4),
+				new Path(3, 6),
+				new Path(3, 8),
+				new Path(1, 10),
+				new Path(1, 12),
+				new Path(3, 14),
+				new Path(3, 16),
+				new Path(1, 18),
+				new Path(1, 20) );
+		
+		IterativeDataSet<Path> currentPaths = edges.iterate(10);
+		
+		DataSet<Path> newPaths = currentPaths
+				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+					.with(new PathConnector())
+				.union(currentPaths).distinct("from", "to");
+		
+		DataSet<Path> result = currentPaths.closeWith(newPaths);
+		
+		result.output(new DiscardingOuputFormat<Path>());
+		
+		env.execute();
+	}
+	
+	private static class PathConnector implements JoinFunction<Path, Path, Path> {
+		
+		@Override
+		public Path join(Path path, Path edge)  {
+			return new Path(path.from, edge.to);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Path {
+		
+		public long from;
+		public long to;
+		
+		public Path() {}
+		
+		public Path(long from, long to) {
+			this.from = from;
+			this.to = to;
+		}
+		
+		@Override
+		public String toString() {
+			return "(" + from + "," + to + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
new file mode 100644
index 0000000..4e0274e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	
+		// the test data is constructed such that the merge join zig zag
+		// has an early out, leaving elements on the static path input unconsumed
+		
+		DataSet<Path> edges = env.fromElements(
+				new Path(2, 1),
+				new Path(4, 1),
+				new Path(6, 3),
+				new Path(8, 3),
+				new Path(10, 1),
+				new Path(12, 1),
+				new Path(14, 3),
+				new Path(16, 3),
+				new Path(18, 1),
+				new Path(20, 1) );
+		
+		IterativeDataSet<Path> currentPaths = edges.iterate(10);
+		
+		DataSet<Path> newPaths = currentPaths
+				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+					.with(new PathConnector())
+				.union(currentPaths).distinct("from", "to");
+		
+		DataSet<Path> result = currentPaths.closeWith(newPaths);
+		
+		result.output(new DiscardingOuputFormat<Path>());
+		
+		env.execute();
+	}
+	
+	private static class PathConnector implements JoinFunction<Path, Path, Path> {
+		
+		@Override
+		public Path join(Path path, Path edge)  {
+			return new Path(path.from, edge.to);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Path {
+		
+		public long from;
+		public long to;
+		
+		public Path() {}
+		
+		public Path(long from, long to) {
+			this.from = from;
+			this.to = to;
+		}
+		
+		@Override
+		public String toString() {
+			return "(" + from + "," + to + ")";
+		}
+	}
+}