You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 15:32:20 UTC
[2/2] flink git commit: [FLINK-1652] fixes superstep increment in
CollectionExecutor
[FLINK-1652] fixes superstep increment in CollectionExecutor
This closes #464
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9077a53b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9077a53b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9077a53b
Branch: refs/heads/master
Commit: 9077a53bf536e96f04a818f029ddc6cf4a674fe4
Parents: e795c43
Author: vasia <va...@gmail.com>
Authored: Mon Mar 9 00:11:20 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 16:21:14 2015 +0200
----------------------------------------------------------------------
.../common/operators/CollectionExecutor.java | 21 +++--
.../test/CollectionModeSuperstepITCase.java | 87 ++++++++++++++++++++
.../operations/DegreesWithExceptionITCase.java | 1 -
3 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 2f9ae9a..78ad930 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -71,6 +71,8 @@ public class CollectionExecutor {
private final ExecutionConfig executionConfig;
+ private int iterationSuperstep;
+
// --------------------------------------------------------------------------------------------
public CollectionExecutor(ExecutionConfig executionConfig) {
@@ -183,7 +185,7 @@ public class CollectionExecutor {
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
- new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+ new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
@@ -225,7 +227,7 @@ public class CollectionExecutor {
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
- new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+ new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
@@ -279,7 +281,10 @@ public class CollectionExecutor {
// set the input to the current partial solution
this.intermediateResults.put(iteration.getPartialSolution(), currentResult);
-
+
+ // set the superstep number
+ iterationSuperstep = superstep;
+
// grab the current iteration result
currentResult = (List<T>) execute(iteration.getNextPartialSolution(), superstep);
@@ -373,6 +378,9 @@ public class CollectionExecutor {
this.intermediateResults.put(iteration.getSolutionSet(), currentSolution);
this.intermediateResults.put(iteration.getWorkset(), currentWorkset);
+ // set the superstep number
+ iterationSuperstep = superstep;
+
// grab the current iteration result
List<T> solutionSetDelta = (List<T>) execute(iteration.getSolutionSetDelta(), superstep);
this.intermediateResults.put(iteration.getSolutionSetDelta(), solutionSetDelta);
@@ -477,16 +485,13 @@ public class CollectionExecutor {
private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
- private final int superstep;
-
- public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, ExecutionConfig executionConfig) {
+ public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) {
super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
- this.superstep = superstep;
}
@Override
public int getSuperstepNumber() {
- return superstep;
+ return iterationSuperstep;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
new file mode 100644
index 0000000..ffe91d9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CollectionModeSuperstepITCase {
+
+ /**
+ * Dummy iteration to test that the supersteps are correctly incremented
+ * and can be retrieved from inside the updated and messaging functions.
+ * All vertices start with value 1 and increase their value by 1
+ * in each iteration.
+ */
+ @Test
+ public void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ VertexCentricIteration<Long, Long, Long, Long> iteration =
+ graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
+ Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+
+ result.getVertices().map(
+ new VertexToTuple2Map<Long, Long>()).output(
+ new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ env.execute();
+ }
+
+ public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+ @Override
+ public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ long superstep = getSuperstepNumber();
+ Assert.assertEquals(true, vertexValue == superstep);
+ setNewVertexValue(vertexValue + 1);
+ }
+ }
+
+ public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+ @Override
+ public void sendMessages(Long vertexId, Long vertexValue) {
+ long superstep = getSuperstepNumber();
+ Assert.assertEquals(true, vertexValue == superstep);
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertexValue);
+ }
+ }
+
+ public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+ public Long map(Vertex<Long, Long> value) {
+ return 1l;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index e83802c..18826b6 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -36,7 +36,6 @@ import java.util.NoSuchElementException;
import static org.junit.Assert.*;
-@SuppressWarnings("serial")
public class DegreesWithExceptionITCase {
private static final int PARALLELISM = 4;