You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/10 15:52:00 UTC

[1/2] flink git commit: [FLINK-5910] [gelly] Framework for Gelly examples

Repository: flink
Updated Branches:
  refs/heads/master 206ea2119 -> 70e78a620


[FLINK-5910] [gelly] Framework for Gelly examples

Driver jobs are composed of an input, an algorithm driver, and an
output. Create the interfaces for inputs, drivers, and outputs.

This closes #3431


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e78a62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e78a62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e78a62

Branch: refs/heads/master
Commit: 70e78a620df503f06e298dd5537f24a56a8cc866
Parents: 694794e
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Feb 28 12:20:42 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 10 09:48:02 2017 -0500

----------------------------------------------------------------------
 .../org/apache/flink/graph/drivers/Driver.java  | 66 ++++++++++++++++++++
 .../apache/flink/graph/drivers/input/Input.java | 51 +++++++++++++++
 .../apache/flink/graph/drivers/output/CSV.java  | 34 ++++++++++
 .../apache/flink/graph/drivers/output/Hash.java | 33 ++++++++++
 .../flink/graph/drivers/output/Print.java       | 33 ++++++++++
 .../graph/drivers/parameter/Parameterized.java  | 51 +++++++++++++++
 6 files changed, 268 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
new file mode 100644
index 0000000..b001875
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+
+/**
+ * A driver for one or more {@link GraphAlgorithm}s and/or
+ * {@link GraphAnalytic}s.
+ *
+ * It is preferable to include multiple, overlapping algorithms/analytics in
+ * the same driver both for simplicity and since this examples module
+ * demonstrates Flink capabilities rather than absolute performance.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public interface Driver<K, VV, EV>
+extends Parameterized {
+
+	/**
+	 * A one-line description, presented in the algorithm listing.
+	 *
+	 * @return short description
+	 */
+	String getShortDescription();
+
+	/**
+	 * A multi-line description, presented in the algorithm usage.
+	 *
+	 * @return long description
+	 */
+	String getLongDescription();
+
+	/**
+	 * "Run" algorithms and analytics on the input graph. The execution plan
+	 * is not finalized here but in the output methods.
+	 *
+	 * Drivers are first configured, next planned, and finally the chosen
+	 * output method is called.
+	 *
+	 * @param graph input graph
+	 * @throws Exception on error
+	 */
+	void plan(Graph<K, VV, EV> graph) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
new file mode 100644
index 0000000..d647dd6
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.graph.generator.GraphGenerator;
+
+/**
+ * Input source for a {@link Graph}, for example a file reader or
+ * {@link GraphGenerator}.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public interface Input<K, VV, EV>
+extends Parameterized {
+
+	/**
+	 * A human-readable identifier summarizing the input and configuration.
+	 *
+	 * @return the unique identifier
+	 */
+	String getIdentity();
+
+	/**
+	 * Create the input {@link Graph}.
+	 *
+	 * @param env the ExecutionEnvironment
+	 * @return the input Graph
+	 */
+	Graph<K, VV, EV> create(ExecutionEnvironment env) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
new file mode 100644
index 0000000..5d1faeb
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Write algorithm output to file using CSV format.
+ */
+public interface CSV {
+
+	/**
+	 * Write execution results to file using CSV format.
+	 *
+	 * @param filename output filename
+	 * @param lineDelimiter CSV delimiter between lines
+	 * @param fieldDelimiter CSV delimiter between fields
+	 */
+	void writeCSV(String filename, String lineDelimiter, String fieldDelimiter);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
new file mode 100644
index 0000000..e1c399e
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Print hash of algorithm output.
+ */
+public interface Hash {
+
+	/**
+	 * Print hash of execution results.
+	 *
+	 * @param executionName job name
+	 * @throws Exception on error
+	 */
+	void hash(String executionName) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
new file mode 100644
index 0000000..be421b0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Print algorithm output.
+ */
+public interface Print {
+
+	/**
+	 * Print execution results.
+	 *
+	 * @param executionName job name
+	 * @throws Exception on error
+	 */
+	void print(String executionName) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
new file mode 100644
index 0000000..b24f8cf
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+
+/**
+ * A configurable command-line choice, such as an input or algorithm.
+ */
+public interface Parameterized {
+
+	/**
+	 * A unique, human-readable identifier. Presented to the user as the
+	 * name of a selectable choice.
+	 *
+	 * @return parameter name
+	 */
+	String getName();
+
+	/**
+	 * Human-readable format for the command-line usage string.
+	 *
+	 * @return command-line documentation string
+	 */
+	String getParameterization();
+
+	/**
+	 * Read parameter values from the command-line arguments.
+	 *
+	 * @param parameterTool parameter parser
+	 * @throws ProgramParametrizationException when configuration is invalid
+	 */
+	void configure(ParameterTool parameterTool) throws ProgramParametrizationException;
+}


[2/2] flink git commit: [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

Posted by gr...@apache.org.
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

GatherSumApplyIteration uses reduce and join for which extra care must
be taken when object reuse is enabled. Adds a check for objects returned
by the user to prevent system objects from being overwritten.

This closes #3402


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/694794eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/694794eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/694794eb

Branch: refs/heads/master
Commit: 694794eb6cbb63dace5a3389a99878f952f0faa5
Parents: 206ea21
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Feb 23 08:47:48 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 10 09:48:02 2017 -0500

----------------------------------------------------------------------
 .../graph/examples/ConnectedComponents.java     |  2 +-
 .../data/ConnectedComponentsDefaultData.java    |  4 +-
 .../flink/graph/test/GatherSumApplyITCase.java  | 50 +++++++++++++-------
 .../translate/translators/LongToLongValue.java  | 40 ++++++++++++++++
 .../apache/flink/graph/gsa/ApplyFunction.java   |  6 ++-
 .../graph/gsa/GatherSumApplyIteration.java      |  9 ++++
 .../apache/flink/graph/utils/GraphUtils.java    | 12 +++++
 7 files changed, 101 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
index 835703b..6651739 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.examples;
 
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
 import org.apache.flink.graph.library.GSAConnectedComponents;
 import org.apache.flink.types.NullValue;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
index c53f5ba..d9fb5cc 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
@@ -44,9 +44,9 @@ public class ConnectedComponentsDefaultData {
 	};
 
 	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>();
+		List<Edge<Long, NullValue>> edgeList = new LinkedList<>();
 		for (Object[] edge : DEFAULT_EDGES) {
-			edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance()));
+			edgeList.add(new Edge<>((long)edge[0], (long)edge[1], NullValue.getInstance()));
 		}
 		return env.fromCollection(edgeList);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 5ce2e28..19cf677 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -19,14 +19,20 @@
 package org.apache.flink.graph.test;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.translate.Translate;
+import org.apache.flink.graph.asm.translate.translators.LongToLongValue;
 import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
 import org.apache.flink.graph.library.GSAConnectedComponents;
 import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -41,29 +47,46 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String expectedResult;
-
 	// --------------------------------------------------------------------------------------------
 	//  Connected Components Test
 	// --------------------------------------------------------------------------------------------
 
+	private String expectedResultCC = "1,1\n" +
+		"2,1\n" +
+		"3,1\n" +
+		"4,1\n";
+
 	@Test
-	public void testConnectedComponents() throws Exception {
+	public void testConnectedComponentsWithObjectReuseDisabled() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableObjectReuse();
 
 		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
 			ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
-			new InitMapperCC(), env);
+			new IdentityMapper<Long>(), env);
 
 		List<Vertex<Long, Long>> result = inputGraph.run(
 			new GSAConnectedComponents<Long, Long, NullValue>(16)).collect();
 
-		expectedResult = "1,1\n" +
-			"2,1\n" +
-			"3,1\n" +
-			"4,1\n";
+		compareResultAsTuples(result, expectedResultCC);
+	}
 
-		compareResultAsTuples(result, expectedResult);
+	@Test
+	public void testConnectedComponentsWithObjectReuseEnabled() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		DataSet<Edge<LongValue, NullValue>> edges = Translate.translateEdgeIds(
+			ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+			new LongToLongValue());
+
+		Graph<LongValue, LongValue, NullValue> inputGraph = Graph.fromDataSet(
+			edges, new IdentityMapper<LongValue>(), env);
+
+		List<Vertex<LongValue, LongValue>> result = inputGraph.run(
+			new GSAConnectedComponents<LongValue, LongValue, NullValue>(16)).collect();
+
+		compareResultAsTuples(result, expectedResultCC);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -81,7 +104,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 		List<Vertex<Long, Double>> result = inputGraph.run(
 			new GSASingleSourceShortestPaths<Long, NullValue>(1L, 16)).collect();
 
-		expectedResult = "1,0.0\n" +
+		String expectedResult = "1,0.0\n" +
 			"2,12.0\n" +
 			"3,13.0\n" +
 			"4,47.0\n" +
@@ -91,13 +114,6 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class InitMapperCC implements MapFunction<Long, Long> {
-		public Long map(Long value) {
-			return value;
-		}
-	}
-
-	@SuppressWarnings("serial")
 	private static final class InitMapperSSSP implements MapFunction<Long, NullValue> {
 		public NullValue map(Long value) {
 			return NullValue.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
new file mode 100644
index 0000000..da79562
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link Long} to {@link LongValue}.
+ */
+public class LongToLongValue
+implements TranslateFunction<Long, LongValue> {
+
+	@Override
+	public LongValue translate(Long value, LongValue reuse)
+			throws Exception {
+		if (reuse == null) {
+			reuse = new LongValue();
+		}
+
+		reuse.setValue(value);
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index f05c254..19d08a5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -137,7 +137,9 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
 	private Collector<Vertex<K, VV>> out;
 
-	private Vertex<K, VV> outVal;
+	// use a local vertex instance so that the user does not overwrite a system
+	// instance used by JoinDriver
+	private Vertex<K, VV> outVal = new Vertex<>();
 
 	public void init(IterationRuntimeContext iterationRuntimeContext) {
 		this.runtimeContext = iterationRuntimeContext;
@@ -145,7 +147,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
 	public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
 		this.out = out;
-		this.outVal = vertex;
+		this.outVal.f0 = vertex.f0;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 442ce68..e941b7b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -332,6 +332,15 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
 			K key = arg0.f0;
 			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+
+			// if the user returns value from the right argument then swap as
+			// in ReduceDriver.run()
+			if (result == arg1.f1) {
+				M tmp = arg1.f1;
+				arg1.f1 = arg0.f1;
+				arg0.f1 = tmp;
+			}
+
 			return new Tuple2<>(key, result);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 5b9e18f..2e0dffc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -44,6 +44,18 @@ public class GraphUtils {
 	}
 
 	/**
+	 * The identity mapper returns the input as output.
+	 *
+	 * @param <T> element type
+	 */
+	public static final class IdentityMapper<T>
+	implements MapFunction<T, T> {
+		public T map(T value) {
+			return value;
+		}
+	}
+
+	/**
 	 * Map each element to a value.
 	 *
 	 * @param <I> input type