You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/10 15:52:00 UTC
[1/2] flink git commit: [FLINK-5910] [gelly] Framework for Gelly
examples
Repository: flink
Updated Branches:
refs/heads/master 206ea2119 -> 70e78a620
[FLINK-5910] [gelly] Framework for Gelly examples
Driver jobs are composed of an input, an algorithm driver, and an
output. Create the interfaces for inputs, drivers, and outputs.
This closes #3431
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e78a62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e78a62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e78a62
Branch: refs/heads/master
Commit: 70e78a620df503f06e298dd5537f24a56a8cc866
Parents: 694794e
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Feb 28 12:20:42 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 10 09:48:02 2017 -0500
----------------------------------------------------------------------
.../org/apache/flink/graph/drivers/Driver.java | 66 ++++++++++++++++++++
.../apache/flink/graph/drivers/input/Input.java | 51 +++++++++++++++
.../apache/flink/graph/drivers/output/CSV.java | 34 ++++++++++
.../apache/flink/graph/drivers/output/Hash.java | 33 ++++++++++
.../flink/graph/drivers/output/Print.java | 33 ++++++++++
.../graph/drivers/parameter/Parameterized.java | 51 +++++++++++++++
6 files changed, 268 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
new file mode 100644
index 0000000..b001875
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+
+/**
+ * A driver for one or more {@link GraphAlgorithm}s and/or
+ * {@link GraphAnalytic}s.
+ *
+ * It is preferable to include multiple, overlapping algorithms/analytics in
+ * the same driver both for simplicity and since this examples module
+ * demonstrates Flink capabilities rather than absolute performance.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public interface Driver<K, VV, EV>
+extends Parameterized {
+
+ /**
+ * A one-line description, presented in the algorithm listing.
+ *
+ * @return short description
+ */
+ String getShortDescription();
+
+ /**
+ * A multi-line description, presented in the algorithm usage.
+ *
+ * @return long description
+ */
+ String getLongDescription();
+
+ /**
+ * "Run" algorithms and analytics on the input graph. The execution plan
+ * is not finalized here but in the output methods.
+ *
+ * Drivers are first configured, next planned, and finally the chosen
+ * output method is called.
+ *
+ * @param graph input graph
+ * @throws Exception on error
+ */
+ void plan(Graph<K, VV, EV> graph) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
new file mode 100644
index 0000000..d647dd6
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/Input.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.graph.generator.GraphGenerator;
+
+/**
+ * Input source for a {@link Graph}, for example a file reader or
+ * {@link GraphGenerator}.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public interface Input<K, VV, EV>
+extends Parameterized {
+
+ /**
+ * A human-readable identifier summarizing the input and configuration.
+ *
+ * @return the unique identifier
+ */
+ String getIdentity();
+
+ /**
+ * Create the input {@link Graph}.
+ *
+ * @param env the ExecutionEnvironment
+ * @return the input Graph
+ */
+ Graph<K, VV, EV> create(ExecutionEnvironment env) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
new file mode 100644
index 0000000..5d1faeb
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Write algorithm output to file using CSV format.
+ */
+public interface CSV {
+
+ /**
+ * Write execution results to file using CSV format.
+ *
+ * @param filename output filename
+ * @param lineDelimiter CSV delimiter between lines
+ * @param fieldDelimiter CSV delimiter between fields
+ */
+ void writeCSV(String filename, String lineDelimiter, String fieldDelimiter);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
new file mode 100644
index 0000000..e1c399e
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Print hash of algorithm output.
+ */
+public interface Hash {
+
+ /**
+ * Print hash of execution results.
+ *
+ * @param executionName job name
+ * @throws Exception on error
+ */
+ void hash(String executionName) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
new file mode 100644
index 0000000..be421b0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+/**
+ * Print algorithm output.
+ */
+public interface Print {
+
+ /**
+ * Print execution results.
+ *
+ * @param executionName job name
+ * @throws Exception on error
+ */
+ void print(String executionName) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70e78a62/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
new file mode 100644
index 0000000..b24f8cf
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+
+/**
+ * A configurable command-line choice, such as an input or algorithm.
+ */
+public interface Parameterized {
+
+ /**
+ * A unique, human-readable identifier. Presented to the user as the
+ * name of a selectable choice.
+ *
+ * @return parameter name
+ */
+ String getName();
+
+ /**
+ * Human-readable format for the command-line usage string.
+ *
+ * @return command-line documentation string
+ */
+ String getParameterization();
+
+ /**
+ * Read parameter values from the command-line arguments.
+ *
+ * @param parameterTool parameter parser
+ * @throws ProgramParametrizationException when configuration is invalid
+ */
+ void configure(ParameterTool parameterTool) throws ProgramParametrizationException;
+}
[2/2] flink git commit: [FLINK-5890] [gelly] GatherSumApply broken
when object reuse enabled
Posted by gr...@apache.org.
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
GatherSumApplyIteration uses reduce and join for which extra care must
be taken when object reuse is enabled. Adds a check for objects returned
by the user to prevent system objects from being overwritten.
This closes #3402
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/694794eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/694794eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/694794eb
Branch: refs/heads/master
Commit: 694794eb6cbb63dace5a3389a99878f952f0faa5
Parents: 206ea21
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Feb 23 08:47:48 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 10 09:48:02 2017 -0500
----------------------------------------------------------------------
.../graph/examples/ConnectedComponents.java | 2 +-
.../data/ConnectedComponentsDefaultData.java | 4 +-
.../flink/graph/test/GatherSumApplyITCase.java | 50 +++++++++++++-------
.../translate/translators/LongToLongValue.java | 40 ++++++++++++++++
.../apache/flink/graph/gsa/ApplyFunction.java | 6 ++-
.../graph/gsa/GatherSumApplyIteration.java | 9 ++++
.../apache/flink/graph/utils/GraphUtils.java | 12 +++++
7 files changed, 101 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
index 835703b..6651739 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
@@ -18,7 +18,6 @@
package org.apache.flink.graph.examples;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
import org.apache.flink.graph.library.GSAConnectedComponents;
import org.apache.flink.types.NullValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
index c53f5ba..d9fb5cc 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
@@ -44,9 +44,9 @@ public class ConnectedComponentsDefaultData {
};
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
- List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>();
+ List<Edge<Long, NullValue>> edgeList = new LinkedList<>();
for (Object[] edge : DEFAULT_EDGES) {
- edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance()));
+ edgeList.add(new Edge<>((long)edge[0], (long)edge[1], NullValue.getInstance()));
}
return env.fromCollection(edgeList);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 5ce2e28..19cf677 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -19,14 +19,20 @@
package org.apache.flink.graph.test;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.translate.Translate;
+import org.apache.flink.graph.asm.translate.translators.LongToLongValue;
import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
import org.apache.flink.graph.library.GSAConnectedComponents;
import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -41,29 +47,46 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String expectedResult;
-
// --------------------------------------------------------------------------------------------
// Connected Components Test
// --------------------------------------------------------------------------------------------
+ private String expectedResultCC = "1,1\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n";
+
@Test
- public void testConnectedComponents() throws Exception {
+ public void testConnectedComponentsWithObjectReuseDisabled() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableObjectReuse();
Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
- new InitMapperCC(), env);
+ new IdentityMapper<Long>(), env);
List<Vertex<Long, Long>> result = inputGraph.run(
new GSAConnectedComponents<Long, Long, NullValue>(16)).collect();
- expectedResult = "1,1\n" +
- "2,1\n" +
- "3,1\n" +
- "4,1\n";
+ compareResultAsTuples(result, expectedResultCC);
+ }
- compareResultAsTuples(result, expectedResult);
+ @Test
+ public void testConnectedComponentsWithObjectReuseEnabled() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+
+ DataSet<Edge<LongValue, NullValue>> edges = Translate.translateEdgeIds(
+ ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+ new LongToLongValue());
+
+ Graph<LongValue, LongValue, NullValue> inputGraph = Graph.fromDataSet(
+ edges, new IdentityMapper<LongValue>(), env);
+
+ List<Vertex<LongValue, LongValue>> result = inputGraph.run(
+ new GSAConnectedComponents<LongValue, LongValue, NullValue>(16)).collect();
+
+ compareResultAsTuples(result, expectedResultCC);
}
// --------------------------------------------------------------------------------------------
@@ -81,7 +104,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
List<Vertex<Long, Double>> result = inputGraph.run(
new GSASingleSourceShortestPaths<Long, NullValue>(1L, 16)).collect();
- expectedResult = "1,0.0\n" +
+ String expectedResult = "1,0.0\n" +
"2,12.0\n" +
"3,13.0\n" +
"4,47.0\n" +
@@ -91,13 +114,6 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class InitMapperCC implements MapFunction<Long, Long> {
- public Long map(Long value) {
- return value;
- }
- }
-
- @SuppressWarnings("serial")
private static final class InitMapperSSSP implements MapFunction<Long, NullValue> {
public NullValue map(Long value) {
return NullValue.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
new file mode 100644
index 0000000..da79562
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongToLongValue.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link Long} to {@link LongValue}.
+ */
+public class LongToLongValue
+implements TranslateFunction<Long, LongValue> {
+
+ @Override
+ public LongValue translate(Long value, LongValue reuse)
+ throws Exception {
+ if (reuse == null) {
+ reuse = new LongValue();
+ }
+
+ reuse.setValue(value);
+ return reuse;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index f05c254..19d08a5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -137,7 +137,9 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
private Collector<Vertex<K, VV>> out;
- private Vertex<K, VV> outVal;
+ // use a local vertex instance so that the user does not overwrite a system
+ // instance used by JoinDriver
+ private Vertex<K, VV> outVal = new Vertex<>();
public void init(IterationRuntimeContext iterationRuntimeContext) {
this.runtimeContext = iterationRuntimeContext;
@@ -145,7 +147,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
this.out = out;
- this.outVal = vertex;
+ this.outVal.f0 = vertex.f0;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 442ce68..e941b7b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -332,6 +332,15 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
K key = arg0.f0;
M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+
+ // if the user returns value from the right argument then swap as
+ // in ReduceDriver.run()
+ if (result == arg1.f1) {
+ M tmp = arg1.f1;
+ arg1.f1 = arg0.f1;
+ arg0.f1 = tmp;
+ }
+
return new Tuple2<>(key, result);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/694794eb/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 5b9e18f..2e0dffc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -44,6 +44,18 @@ public class GraphUtils {
}
/**
+ * The identity mapper returns the input as output.
+ *
+ * @param <T> element type
+ */
+ public static final class IdentityMapper<T>
+ implements MapFunction<T, T> {
+ public T map(T value) {
+ return value;
+ }
+ }
+
+ /**
* Map each element to a value.
*
* @param <I> input type