You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/14 16:40:16 UTC
incubator-flink git commit: [FLINK-1324] [runtime] Trailing data is
cached before the local input strategies are closed.
Repository: incubator-flink
Updated Branches:
refs/heads/master e47365bc0 -> 4cc6bb1db
[FLINK-1324] [runtime] Trailing data is cached before the local input strategies are closed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4cc6bb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4cc6bb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4cc6bb1d
Branch: refs/heads/master
Commit: 4cc6bb1db8390d8339b585171e1cac63a1903c8f
Parents: e47365b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 12 14:31:52 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 15:59:04 2014 +0100
----------------------------------------------------------------------
.../runtime/operators/RegularPactTask.java | 13 ++-
.../CompensatableDotProductCoGroup.java | 1 +
.../CompensatableDotProductMatch.java | 1 +
.../danglingpagerank/CompensatingMap.java | 1 +
...nIncompleteDynamicPathConsumptionITCase.java | 92 ++++++++++++++++++++
...onIncompleteStaticPathConsumptionITCase.java | 92 ++++++++++++++++++++
6 files changed, 198 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 2a598ca..5012a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -855,8 +855,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
for (int i = 0; i < numInputs; i++) {
final int memoryPages;
- final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
final boolean cached = this.config.isInputCached(i);
+ final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
this.inputIsAsyncMaterialized[i] = async;
this.inputIsCached[i] = cached;
@@ -888,6 +888,16 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
protected void resetAllInputs() throws Exception {
+
+ // first we need to make sure that caches consume remaining data
+ // NOTE: we need to do this before closing the local strategies
+ for (int i = 0; i < this.inputs.length; i++) {
+
+ if (this.inputIsCached[i] && this.resettableInputs[i] != null) {
+ this.resettableInputs[i].consumeAndCacheRemainingData();
+ }
+ }
+
// close all local-strategies. they will either get re-initialized, or we have
// read them now and their data is cached
for (int i = 0; i < this.localStrategies.length; i++) {
@@ -918,7 +928,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
if (this.tempBarriers[i] != null) {
this.inputs[i] = this.tempBarriers[i].getIterator();
} else if (this.resettableInputs[i] != null) {
- this.resettableInputs[i].consumeAndCacheRemainingData();
this.resettableInputs[i].reset();
this.inputs[i] = this.resettableInputs[i];
} else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
index b1d572a..bd1eeb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
@@ -29,6 +29,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+@SuppressWarnings("deprecation")
public class CompensatableDotProductCoGroup extends CoGroupFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
index 45e517f..0508886 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+@SuppressWarnings("deprecation")
public class CompensatableDotProductMatch extends JoinFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
index f8bc798..d8189ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
@@ -28,6 +28,7 @@ import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+@SuppressWarnings("deprecation")
public class CompensatingMap extends MapFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
new file mode 100644
index 0000000..e924a88
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // the test data is constructed such that the merge join zig zag
+ // has an early out, leaving elements on the dynamic path input unconsumed
+
+ DataSet<Path> edges = env.fromElements(
+ new Path(1, 2),
+ new Path(1, 4),
+ new Path(3, 6),
+ new Path(3, 8),
+ new Path(1, 10),
+ new Path(1, 12),
+ new Path(3, 14),
+ new Path(3, 16),
+ new Path(1, 18),
+ new Path(1, 20) );
+
+ IterativeDataSet<Path> currentPaths = edges.iterate(10);
+
+ DataSet<Path> newPaths = currentPaths
+ .join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+ .with(new PathConnector())
+ .union(currentPaths).distinct("from", "to");
+
+ DataSet<Path> result = currentPaths.closeWith(newPaths);
+
+ result.output(new DiscardingOuputFormat<Path>());
+
+ env.execute();
+ }
+
+ private static class PathConnector implements JoinFunction<Path, Path, Path> {
+
+ @Override
+ public Path join(Path path, Path edge) {
+ return new Path(path.from, edge.to);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class Path {
+
+ public long from;
+ public long to;
+
+ public Path() {}
+
+ public Path(long from, long to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + from + "," + to + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cc6bb1d/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
new file mode 100644
index 0000000..4e0274e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // the test data is constructed such that the merge join zig zag
+ // has an early out, leaving elements on the static path input unconsumed
+
+ DataSet<Path> edges = env.fromElements(
+ new Path(2, 1),
+ new Path(4, 1),
+ new Path(6, 3),
+ new Path(8, 3),
+ new Path(10, 1),
+ new Path(12, 1),
+ new Path(14, 3),
+ new Path(16, 3),
+ new Path(18, 1),
+ new Path(20, 1) );
+
+ IterativeDataSet<Path> currentPaths = edges.iterate(10);
+
+ DataSet<Path> newPaths = currentPaths
+ .join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+ .with(new PathConnector())
+ .union(currentPaths).distinct("from", "to");
+
+ DataSet<Path> result = currentPaths.closeWith(newPaths);
+
+ result.output(new DiscardingOuputFormat<Path>());
+
+ env.execute();
+ }
+
+ private static class PathConnector implements JoinFunction<Path, Path, Path> {
+
+ @Override
+ public Path join(Path path, Path edge) {
+ return new Path(path.from, edge.to);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class Path {
+
+ public long from;
+ public long to;
+
+ public Path() {}
+
+ public Path(long from, long to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + from + "," + to + ")";
+ }
+ }
+}