You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 15:32:20 UTC

[2/2] flink git commit: [FLINK-1652] fixes superstep increment in CollectionExecutor

[FLINK-1652] fixes superstep increment in CollectionExecutor

This closes #464


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9077a53b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9077a53b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9077a53b

Branch: refs/heads/master
Commit: 9077a53bf536e96f04a818f029ddc6cf4a674fe4
Parents: e795c43
Author: vasia <va...@gmail.com>
Authored: Mon Mar 9 00:11:20 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 16:21:14 2015 +0200

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    | 21 +++--
 .../test/CollectionModeSuperstepITCase.java     | 87 ++++++++++++++++++++
 .../operations/DegreesWithExceptionITCase.java  |  1 -
 3 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 2f9ae9a..78ad930 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -71,6 +71,8 @@ public class CollectionExecutor {
 	
 	private final ExecutionConfig executionConfig;
 
+	private int iterationSuperstep;
+
 	// --------------------------------------------------------------------------------------------
 	
 	public CollectionExecutor(ExecutionConfig executionConfig) {
@@ -183,7 +185,7 @@ public class CollectionExecutor {
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -225,7 +227,7 @@ public class CollectionExecutor {
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -279,7 +281,10 @@ public class CollectionExecutor {
 			
 			// set the input to the current partial solution
 			this.intermediateResults.put(iteration.getPartialSolution(), currentResult);
-			
+
+			// set the superstep number
+			iterationSuperstep = superstep;
+
 			// grab the current iteration result
 			currentResult = (List<T>) execute(iteration.getNextPartialSolution(), superstep);
 
@@ -373,6 +378,9 @@ public class CollectionExecutor {
 			this.intermediateResults.put(iteration.getSolutionSet(), currentSolution);
 			this.intermediateResults.put(iteration.getWorkset(), currentWorkset);
 
+			// set the superstep number
+			iterationSuperstep = superstep;
+
 			// grab the current iteration result
 			List<T> solutionSetDelta = (List<T>) execute(iteration.getSolutionSetDelta(), superstep);
 			this.intermediateResults.put(iteration.getSolutionSetDelta(), solutionSetDelta);
@@ -477,16 +485,13 @@ public class CollectionExecutor {
 	
 	private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
 
-		private final int superstep;
-
-		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, ExecutionConfig executionConfig) {
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) {
 			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
-			this.superstep = superstep;
 		}
 
 		@Override
 		public int getSuperstepNumber() {
-			return superstep;
+			return iterationSuperstep;
 		}
 
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
new file mode 100644
index 0000000..ffe91d9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CollectionModeSuperstepITCase {
+
+	/**
+	 * Dummy iteration to test that the supersteps are correctly incremented
+	 * and can be retrieved from inside the updated and messaging functions.
+	 * All vertices start with value 1 and increase their value by 1
+	 * in each iteration. 
+	 */
+	@Test
+	public void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+		
+		VertexCentricIteration<Long, Long, Long, Long> iteration = 
+				graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+
+		result.getVertices().map(
+				new VertexToTuple2Map<Long, Long>()).output(
+						new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		env.execute();
+	}
+	
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertexValue == superstep);
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertexValue == superstep);
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index e83802c..18826b6 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -36,7 +36,6 @@ import java.util.NoSuchElementException;
 
 import static org.junit.Assert.*;
 
-@SuppressWarnings("serial")
 public class DegreesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;