You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/04 21:33:03 UTC
[1/7] flink git commit: [FLINK-1522][gelly] Added test for SSSP
Example
Repository: flink
Updated Branches:
refs/heads/master d8d642fd6 -> e1e03062c
[FLINK-1522][gelly] Added test for SSSP Example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f329e4ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f329e4ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f329e4ed
Branch: refs/heads/master
Commit: f329e4eda9482635a820093476803a73824d9019
Parents: d8d642f
Author: andralungu <lu...@gmail.com>
Authored: Fri Feb 20 20:11:42 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:01:36 2015 +0100
----------------------------------------------------------------------
.../SingleSourceShortestPathsExample.java | 99 ++++++++++++++++++--
.../utils/SingleSourceShortestPathsData.java | 70 ++++++++++++++
.../SingleSourceShortestPathsITCase.java | 78 +++++++++++++++
3 files changed, 238 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f329e4ed/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 7f31525..c590f30 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -19,27 +19,30 @@
package org.apache.flink.graph.example;
import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ExampleUtils;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.library.SingleSourceShortestPaths;
public class SingleSourceShortestPathsExample implements ProgramDescription {
- private static int maxIterations = 5;
-
public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (!parseParameters(args)) {
+ return;
+ }
- DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env);
+ DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
- Long srcVertexId = 1L;
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
@@ -47,13 +50,91 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
.run(new SingleSourceShortestPaths<Long>(srcVertexId,
maxIterations)).getVertices();
- singleSourceShortestPaths.print();
+ // emit result
+ if (fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ singleSourceShortestPaths.print();
+ }
- env.execute();
+ env.execute("Single Source Shortest Paths Example");
}
@Override
public String getDescription() {
return "Single Source Shortest Paths";
}
+
+ // ******************************************************************************************************************
+ // UTIL METHODS
+ // ******************************************************************************************************************
+
+ private static boolean fileOutput = false;
+
+ private static Long srcVertexId = null;
+
+ private static String verticesInputPath = null;
+
+ private static String edgesInputPath = null;
+
+ private static String outputPath = null;
+
+ private static int maxIterations = 5;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ if (args.length == 5) {
+ fileOutput = true;
+ srcVertexId = Long.parseLong(args[0]);
+ verticesInputPath = args[1];
+ edgesInputPath = args[2];
+ outputPath = args[3];
+ maxIterations = Integer.parseInt(args[4]);
+ } else {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input vertices path> <input edges path> <output path> <num iterations>");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(verticesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class)
+ .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+
+ @Override
+ public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
+ return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
+ }
+ });
+ } else {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input vertices path> <input edges path> <output path> <num iterations>");
+ return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
+ return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+ }
+ });
+ } else {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input vertices path> <input edges path> <output path> <num iterations>");
+ return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f329e4ed/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
new file mode 100644
index 0000000..7e5445f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SingleSourceShortestPathsData {
+
+ public static final int NUM_VERTICES = 5;
+
+ public static final Long SRC_VERTEX_ID = 1L;
+
+ public static final String VERTICES = "1,1.0\n" + "2,2.0\n" + "3,3.0\n" + "4,4.0\n" + "5,5.0";
+
+ public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+ vertices.add(new Vertex<Long, Double>(1L, 1.0));
+ vertices.add(new Vertex<Long, Double>(2L, 2.0));
+ vertices.add(new Vertex<Long, Double>(3L, 3.0));
+ vertices.add(new Vertex<Long, Double>(4L, 4.0));
+ vertices.add(new Vertex<Long, Double>(5L, 5.0));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final String EDGES = "1,2,12.0\n" + "1,3,13.0\n" + "2,3,23.0\n" + "3,4,34.0\n" + "3,5,35.0\n" +
+ "4,5,45.0\n" + "5,1,51.0";
+
+ public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
+ edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
+ edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
+ edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
+ edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0.0\n" + "2,12.0\n" + "3,13.0\n" +
+ "4,47.0\n" + "5,48.0";
+
+ private SingleSourceShortestPathsData() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f329e4ed/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
new file mode 100644
index 0000000..761d71f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.SingleSourceShortestPathsExample;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
+
+ private String verticesPath;
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public SingleSourceShortestPathsITCase(ExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(SingleSourceShortestPathsData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testSSSPExample() throws Exception {
+ SingleSourceShortestPathsExample.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+ verticesPath, edgesPath, resultPath, SingleSourceShortestPathsData.NUM_VERTICES + ""});
+ expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
\ No newline at end of file
[7/7] flink git commit: [gelly] refactored tests;
removed duplicate data from TestGraphUtils
Posted by va...@apache.org.
[gelly] refactored tests; removed duplicate data from TestGraphUtils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1e03062
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1e03062
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1e03062
Branch: refs/heads/master
Commit: e1e03062ccab7db0534a866fa5a984095e2b5eef
Parents: b529b62
Author: vasia <va...@gmail.com>
Authored: Wed Mar 4 15:49:00 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:31:56 2015 +0100
----------------------------------------------------------------------
.../graph/example/LabelPropagationExample.java | 9 +-
.../SingleSourceShortestPathsExample.java | 4 +-
.../apache/flink/graph/test/DegreesITCase.java | 171 ------
.../flink/graph/test/FromCollectionITCase.java | 120 -----
.../flink/graph/test/GraphCreationITCase.java | 170 ------
.../test/GraphCreationWithMapperITCase.java | 158 ------
.../flink/graph/test/GraphMutationsITCase.java | 273 ----------
.../flink/graph/test/GraphOperationsITCase.java | 267 ----------
.../flink/graph/test/JoinWithEdgesITCase.java | 519 ------------------
.../graph/test/JoinWithVerticesITCase.java | 218 --------
.../test/LabelPropagationExampleITCase.java | 176 -------
.../apache/flink/graph/test/MapEdgesITCase.java | 223 --------
.../flink/graph/test/MapVerticesITCase.java | 233 ---------
.../graph/test/ReduceOnEdgesMethodsITCase.java | 317 -----------
.../test/ReduceOnNeighborMethodsITCase.java | 303 -----------
.../apache/flink/graph/test/TestGraphUtils.java | 62 ++-
.../example/LabelPropagationExampleITCase.java | 143 +++++
.../graph/test/operations/DegreesITCase.java | 172 ++++++
.../test/operations/FromCollectionITCase.java | 121 +++++
.../test/operations/GraphCreationITCase.java | 171 ++++++
.../GraphCreationWithMapperITCase.java | 159 ++++++
.../test/operations/GraphMutationsITCase.java | 274 ++++++++++
.../test/operations/GraphOperationsITCase.java | 268 ++++++++++
.../test/operations/JoinWithEdgesITCase.java | 520 +++++++++++++++++++
.../test/operations/JoinWithVerticesITCase.java | 219 ++++++++
.../graph/test/operations/MapEdgesITCase.java | 224 ++++++++
.../test/operations/MapVerticesITCase.java | 234 +++++++++
.../operations/ReduceOnEdgesMethodsITCase.java | 318 ++++++++++++
.../ReduceOnNeighborMethodsITCase.java | 304 +++++++++++
29 files changed, 3174 insertions(+), 3176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index 78cb5d5..e399b3f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -85,8 +85,8 @@ public class LabelPropagationExample implements ProgramDescription {
private static boolean parseParameters(String[] args) {
if(args.length > 0) {
- if(args.length != 5) {
- System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+ if(args.length != 4) {
+ System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
return false;
}
@@ -94,13 +94,12 @@ public class LabelPropagationExample implements ProgramDescription {
vertexInputPath = args[0];
edgeInputPath = args[1];
outputPath = args[2];
- numVertices = Integer.parseInt(args[3]);
- maxIterations = Integer.parseInt(args[4]);
+ maxIterations = Integer.parseInt(args[3]);
} else {
System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+ System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
}
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index c590f30..6c85397 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -100,6 +100,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
return true;
}
+ @SuppressWarnings("serial")
private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(verticesInputPath)
@@ -119,6 +120,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
}
}
+ @SuppressWarnings("serial")
private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
@@ -128,7 +130,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
@Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
- return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+ return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2);
}
});
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
deleted file mode 100644
index 96a6d20..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DegreesITCase extends MultipleProgramsTestBase {
-
- public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testOutDegrees() throws Exception {
- /*
- * Test outDegrees()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.outDegrees().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,1\n" +
- "3,2\n" +
- "4,1\n" +
- "5,1\n";
- }
-
- @Test
- public void testOutDegreesWithNoOutEdges() throws Exception {
- /*
- * Test outDegrees() no outgoing edges
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
- graph.outDegrees().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,3\n" +
- "2,1\n" +
- "3,1\n" +
- "4,1\n" +
- "5,0\n";
- }
-
- @Test
- public void testInDegrees() throws Exception {
- /*
- * Test inDegrees()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.inDegrees().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,1\n" +
- "2,1\n" +
- "3,2\n" +
- "4,1\n" +
- "5,2\n";
- }
-
- @Test
- public void testInDegreesWithNoInEdge() throws Exception {
- /*
- * Test inDegrees() no ingoing edge
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
- graph.inDegrees().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,0\n" +
- "2,1\n" +
- "3,1\n" +
- "4,1\n" +
- "5,3\n";
- }
-
- @Test
- public void testGetDegrees() throws Exception {
- /*
- * Test getDegrees()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.getDegrees().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,3\n" +
- "2,2\n" +
- "3,4\n" +
- "4,2\n" +
- "5,3\n";
- }
-
- @Test
- public void testGetDegreesWithDisconnectedData() throws Exception {
- /*
- * Test getDegrees() with disconnected data
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, NullValue, Long> graph =
- Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-
- graph.outDegrees().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2\n" +
- "2,1\n" +
- "3,0\n" +
- "4,1\n" +
- "5,0\n";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
deleted file mode 100644
index 5259143..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class FromCollectionITCase extends MultipleProgramsTestBase {
-
- public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testFromCollectionVerticesEdges() throws Exception {
- /*
- * Test fromCollection(vertices, edges):
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
- TestGraphUtils.getLongLongEdges(), env);
-
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testFromCollectionEdgesNoInitialValue() throws Exception {
- /*
- * Test fromCollection(edges) with no initial value for the vertices
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
- env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,(null)\n" +
- "2,(null)\n" +
- "3,(null)\n" +
- "4,(null)\n" +
- "5,(null)\n";
- }
-
- @Test
- public void testFromCollectionEdgesWithInitialValue() throws Exception {
- /*
- * Test fromCollection(edges) with vertices initialised by a
- * function that takes the id and doubles it
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
- new InitVerticesMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2\n" +
- "2,4\n" +
- "3,6\n" +
- "4,8\n" +
- "5,10\n";
- }
-
- @SuppressWarnings("serial")
- private static final class InitVerticesMapper implements MapFunction<Long, Long> {
- public Long map(Long vertexId) {
- return vertexId * 2;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
deleted file mode 100644
index 4cbdd90..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphCreationITCase extends MultipleProgramsTestBase {
-
- public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testCreateWithoutVertexValues() throws Exception {
- /*
- * Test create() with edge dataset and no vertex values
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,(null)\n" +
- "2,(null)\n" +
- "3,(null)\n" +
- "4,(null)\n" +
- "5,(null)\n";
- }
-
- @Test
- public void testCreateWithMapper() throws Exception {
- /*
- * Test create() with edge dataset and a mapper that assigns the id as value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
- new AssignIdAsValueMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,1\n" +
- "2,2\n" +
- "3,3\n" +
- "4,4\n" +
- "5,5\n";
- }
-
- @Test
- public void testCreateWithCustomVertexValue() throws Exception {
- /*
- * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
- TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,(2.0,0)\n" +
- "2,(4.0,1)\n" +
- "3,(6.0,2)\n" +
- "4,(8.0,3)\n" +
- "5,(10.0,4)\n";
- }
-
- @Test
- public void testValidate() throws Exception {
- /*
- * Test validate():
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
- DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
- DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-
- result.writeAsText(resultPath);
- env.execute();
-
- expectedResult = "true\n";
- }
-
- @Test
- public void testValidateWithInvalidIds() throws Exception {
- /*
- * Test validate() - invalid vertex ids
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
- DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
- DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
- result.writeAsText(resultPath);
- env.execute();
-
- expectedResult = "false\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
- public Long map(Long vertexId) {
- return vertexId;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AssignCustomVertexValueMapper implements
- MapFunction<Long, DummyCustomParameterizedType<Double>> {
-
- DummyCustomParameterizedType<Double> dummyValue =
- new DummyCustomParameterizedType<Double>();
-
- public DummyCustomParameterizedType<Double> map(Long vertexId) {
- dummyValue.setIntField(vertexId.intValue()-1);
- dummyValue.setTField(vertexId*2.0);
- return dummyValue;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
deleted file mode 100644
index 24f7c82..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
-
- public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testWithDoubleValueMapper() throws Exception {
- /*
- * Test create() with edge dataset and a mapper that assigns a double constant as value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
- new AssignDoubleValueMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,0.1\n" +
- "2,0.1\n" +
- "3,0.1\n" +
- "4,0.1\n" +
- "5,0.1\n";
- }
-
- @Test
- public void testWithTuple2ValueMapper() throws Exception {
- /*
- * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
- TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,(2,42)\n" +
- "2,(4,42)\n" +
- "3,(6,42)\n" +
- "4,(8,42)\n" +
- "5,(10,42)\n";
- }
-
- @Test
- public void testWithConstantValueMapper() throws Exception {
- /*
- * Test create() with edge dataset with String key type
- * and a mapper that assigns a double constant as value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
- new AssignDoubleConstantMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,0.1\n" +
- "2,0.1\n" +
- "3,0.1\n" +
- "4,0.1\n" +
- "5,0.1\n";
- }
-
- @Test
- public void testWithDCustomValueMapper() throws Exception {
- /*
- * Test create() with edge dataset and a mapper that assigns a custom vertex value
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
- TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
-
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,(F,0)\n" +
- "2,(F,1)\n" +
- "3,(F,2)\n" +
- "4,(F,3)\n" +
- "5,(F,4)\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
- public Double map(Long value) {
- return 0.1d;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> map(Long vertexId) {
- return new Tuple2<Long, Long>(vertexId*2, 42l);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> {
- public Double map(String value) {
- return 0.1d;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> {
- public DummyCustomType map(Long vertexId) {
- return new DummyCustomType(vertexId.intValue()-1, false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
deleted file mode 100644
index 3af8943..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphMutationsITCase extends MultipleProgramsTestBase {
-
- public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testAddVertex() throws Exception {
- /*
- * Test addVertex() -- simple case
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
- edges.add(new Edge<Long, Long>(6L, 1L, 61L));
- graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n" +
- "6,1,61\n";
- }
-
- @Test
- public void testAddVertexExisting() throws Exception {
- /*
- * Test addVertex() -- add an existing vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
- edges.add(new Edge<Long, Long>(1L, 5L, 15L));
- graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "1,5,15\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testAddVertexNoEdges() throws Exception {
- /*
- * Test addVertex() -- add vertex with empty edge set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
- graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
- graph.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,1\n" +
- "2,2\n" +
- "3,3\n" +
- "4,4\n" +
- "5,5\n" +
- "6,6\n";
- }
-
- @Test
- public void testRemoveVertex() throws Exception {
- /*
- * Test removeVertex() -- simple case
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n";
- }
-
- @Test
- public void testRemoveInvalidVertex() throws Exception {
- /*
- * Test removeVertex() -- remove an invalid vertex
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testAddEdge() throws Exception {
- /*
- * Test addEdge() -- simple case
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
- 61L);
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n" +
- "6,1,61\n";
- }
-
- @Test
- public void testAddExistingEdge() throws Exception {
- /*
- * Test addEdge() -- add already existing edge
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
- 12L);
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testRemoveVEdge() throws Exception {
- /*
- * Test removeEdge() -- simple case
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n";
- }
-
- @Test
- public void testRemoveInvalidEdge() throws Exception {
- /*
- * Test removeEdge() -- invalid edge
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
- graph.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
deleted file mode 100644
index f194a60..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphOperationsITCase extends MultipleProgramsTestBase {
-
- public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testUndirected() throws Exception {
- /*
- * Test getUndirected()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.getUndirected().getEdges().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2,12\n" + "2,1,12\n" +
- "1,3,13\n" + "3,1,13\n" +
- "2,3,23\n" + "3,2,23\n" +
- "3,4,34\n" + "4,3,34\n" +
- "3,5,35\n" + "5,3,35\n" +
- "4,5,45\n" + "5,4,45\n" +
- "5,1,51\n" + "1,5,51\n";
- }
-
- @Test
- public void testReverse() throws Exception {
- /*
- * Test reverse()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- graph.reverse().getEdges().writeAsCsv(resultPath);
- env.execute();
- expectedResult = "2,1,12\n" +
- "3,1,13\n" +
- "3,2,23\n" +
- "4,3,34\n" +
- "5,3,35\n" +
- "5,4,45\n" +
- "1,5,51\n";
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testSubGraph() throws Exception {
- /*
- * Test subgraph:
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
- public boolean filter(Vertex<Long, Long> vertex) throws Exception {
- return (vertex.getValue() > 2);
- }
- },
- new FilterFunction<Edge<Long, Long>>() {
- public boolean filter(Edge<Long, Long> edge) throws Exception {
- return (edge.getValue() > 34);
- }
- }).getEdges().writeAsCsv(resultPath);
-
- env.execute();
- expectedResult = "3,5,35\n" +
- "4,5,45\n";
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testFilterVertices() throws Exception {
- /*
- * Test filterOnVertices:
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
- public boolean filter(Vertex<Long, Long> vertex) throws Exception {
- return (vertex.getValue() > 2);
- }
- }).getEdges().writeAsCsv(resultPath);
-
- env.execute();
- expectedResult = "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n";
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testFilterEdges() throws Exception {
- /*
- * Test filterOnEdges:
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
- public boolean filter(Edge<Long, Long> edge) throws Exception {
- return (edge.getValue() > 34);
- }
- }).getEdges().writeAsCsv(resultPath);
-
- env.execute();
- expectedResult = "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testNumberOfVertices() throws Exception {
- /*
- * Test numberOfVertices()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.numberOfVertices().writeAsText(resultPath);
-
- env.execute();
- expectedResult = "5";
- }
-
- @Test
- public void testNumberOfEdges() throws Exception {
- /*
- * Test numberOfEdges()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.numberOfEdges().writeAsText(resultPath);
-
- env.execute();
- expectedResult = "7";
- }
-
- @Test
- public void testVertexIds() throws Exception {
- /*
- * Test getVertexIds()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.getVertexIds().writeAsText(resultPath);
-
- env.execute();
- expectedResult = "1\n2\n3\n4\n5\n";
- }
-
- @Test
- public void testEdgesIds() throws Exception {
- /*
- * Test getEdgeIds()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
- graph.getEdgeIds().writeAsCsv(resultPath);
-
- env.execute();
- expectedResult = "1,2\n" + "1,3\n" +
- "2,3\n" + "3,4\n" +
- "3,5\n" + "4,5\n" +
- "5,1\n";
- }
-
- @Test
- public void testUnion() throws Exception {
- /*
- * Test union()
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
- List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-
- vertices.add(new Vertex<Long, Long>(6L, 6L));
- edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-
- graph = graph.union(Graph.fromCollection(vertices, edges, env));
-
- graph.getEdges().writeAsCsv(resultPath);
-
- env.execute();
-
- expectedResult = "1,2,12\n" +
- "1,3,13\n" +
- "2,3,23\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n" +
- "6,1,61\n";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
deleted file mode 100644
index 6f4f6a8..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
-
- public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testWithEdgesInputDataset() throws Exception {
- /*
- * Test joinWithEdges with the input DataSet parameter identical
- * to the edge DataSet
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
- .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,68\n" +
- "3,5,70\n" +
- "4,5,90\n" +
- "5,1,102\n";
- }
-
- @Test
- public void testWithLessElements() throws Exception {
- /*
- * Test joinWithEdges with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet, but of the same type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
- .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testWithLessElementsDifferentType() throws Exception {
- /*
- * Test joinWithEdges with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet and of a different type(Boolean)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
- .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testWithNoCommonKeys() throws Exception {
- /*
- * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet
- * - the iterator becomes empty.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
- new DoubleValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,68\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testWithCustomType() throws Exception {
- /*
- * Test joinWithEdges with a DataSet containing custom parametrised type input values
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
- new CustomValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,10\n" +
- "1,3,20\n" +
- "2,3,30\n" +
- "3,4,40\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testWithEdgesOnSource() throws Exception {
- /*
- * Test joinWithEdgesOnSource with the input DataSet parameter identical
- * to the edge DataSet
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
- .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,25\n" +
- "2,3,46\n" +
- "3,4,68\n" +
- "3,5,69\n" +
- "4,5,90\n" +
- "5,1,102\n";
- }
-
- @Test
- public void testOnSourceWithLessElements() throws Exception {
- /*
- * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet, but of the same type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
- .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,25\n" +
- "2,3,46\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testOnSourceWithDifferentType() throws Exception {
- /*
- * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet and of a different type(Boolean)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
- .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testOnSourceWithNoCommonKeys() throws Exception {
- /*
- * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet
- * - the iterator becomes empty.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
- new DoubleValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,20\n" +
- "1,3,20\n" +
- "2,3,60\n" +
- "3,4,80\n" +
- "3,5,80\n" +
- "4,5,120\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testOnSourceWithCustom() throws Exception {
- /*
- * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
- new CustomValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,10\n" +
- "1,3,10\n" +
- "2,3,30\n" +
- "3,4,40\n" +
- "3,5,40\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testWithEdgesOnTarget() throws Exception {
- /*
- * Test joinWithEdgesOnTarget with the input DataSet parameter identical
- * to the edge DataSet
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
- .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,36\n" +
- "3,4,68\n" +
- "3,5,70\n" +
- "4,5,80\n" +
- "5,1,102\n";
- }
-
- @Test
- public void testWithOnTargetWithLessElements() throws Exception {
- /*
- * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet, but of the same type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
- .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,36\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testOnTargetWithDifferentType() throws Exception {
- /*
- * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
- * less elements than the edge DataSet and of a different type(Boolean)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
- .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,24\n" +
- "1,3,26\n" +
- "2,3,46\n" +
- "3,4,34\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @Test
- public void testOnTargetWithNoCommonKeys() throws Exception {
- /*
- * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet
- * - the iterator becomes empty.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
- new DoubleValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,20\n" +
- "1,3,40\n" +
- "2,3,40\n" +
- "3,4,80\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,140\n";
- }
-
- @Test
- public void testOnTargetWithCustom() throws Exception {
- /*
- * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
- new CustomValueMapper());
-
- result.getEdges().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,10\n" +
- "1,3,20\n" +
- "2,3,20\n" +
- "3,4,40\n" +
- "3,5,35\n" +
- "4,5,45\n" +
- "5,1,51\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f0 + tuple.f1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
- public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
- return new Tuple3<Long, Long, Boolean>(edge.getSource(),
- edge.getTarget(), true);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
- public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
- if(tuple.f1) {
- return tuple.f0 * 2;
- }
- else {
- return tuple.f0;
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f1 * 2;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
- public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
- return (long) tuple.f1.getIntField();
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
- return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
- public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
- return new Tuple2<Long, Boolean>(edge.getSource(), true);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
- return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
- public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
- return new Tuple2<Long, Boolean>(edge.getTarget(), true);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
deleted file mode 100644
index 0574265..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
-
- public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testJoinWithVertexSet() throws Exception {
- /*
- * Test joinWithVertices with the input DataSet parameter identical
- * to the vertex DataSet
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
- .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,4\n" +
- "3,6\n" +
- "4,8\n" +
- "5,10\n";
- }
-
- @Test
- public void testWithLessElements() throws Exception {
- /*
- * Test joinWithVertices with the input DataSet passed as a parameter containing
- * less elements than the vertex DataSet, but of the same type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
- .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,4\n" +
- "3,6\n" +
- "4,4\n" +
- "5,5\n";
- }
-
- @Test
- public void testWithDifferentType() throws Exception {
- /*
- * Test joinWithVertices with the input DataSet passed as a parameter containing
- * less elements than the vertex DataSet and of a different type(Boolean)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
- .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
-
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,4\n" +
- "3,6\n" +
- "4,4\n" +
- "5,5\n";
- }
-
- @Test
- public void testWithDifferentKeys() throws Exception {
- /*
- * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet
- * - the iterator becomes empty.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
- new ProjectSecondMapper());
-
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,10\n" +
- "2,20\n" +
- "3,30\n" +
- "4,40\n" +
- "5,5\n";
- }
-
- @Test
- public void testWithCustomType() throws Exception {
- /*
- * Test joinWithVertices with a DataSet containing custom parametrised type input values
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
- new CustomValueMapper());
-
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,10\n" +
- "2,20\n" +
- "3,30\n" +
- "4,40\n" +
- "5,5\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f0 + tuple.f1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
- public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
- return new Tuple2<Long, Boolean>(vertex.getId(), true);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
- public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
- if(tuple.f1) {
- return tuple.f0 * 2;
- }
- else {
- return tuple.f0;
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
- public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
- return (long) tuple.f1.getIntField();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
deleted file mode 100755
index dfb0f3f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagationExample;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
-
- public LabelPropagationExampleITCase(ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testSingleIteration() throws Exception {
- /*
- * Test one iteration of label propagation example with a simple graph
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 30\n" +
- "4 40\n" +
- "5 40\n" +
- "6 40\n" +
- "7 70\n";
-
- final String edges = "1 3\n" +
- "2 3\n" +
- "4 7\n" +
- "5 7\n" +
- "6 7\n" +
- "7 3\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "7", "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,40\n" +
- "5,40\n" +
- "6,40\n" +
- "7,40\n";
- }
-
- @Test
- public void testTieBreaker() throws Exception {
- /*
- * Test the label propagation example where a tie must be broken
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 10\n" +
- "4 10\n" +
- "5 0\n" +
- "6 20\n" +
- "7 20\n" +
- "8 20\n" +
- "9 20\n";
-
- final String edges = "1 5\n" +
- "2 5\n" +
- "3 5\n" +
- "4 5\n" +
- "6 5\n" +
- "7 5\n" +
- "8 5\n" +
- "9 5\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "9", "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,10\n" +
- "5,20\n" +
- "6,20\n" +
- "7,20\n" +
- "8,20\n" +
- "9,20\n";
- }
-
- @Test
- public void testTermination() throws Exception {
- /*
- * Test the label propagation example where the algorithm terminates on the first iteration
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 10\n" +
- "4 40\n" +
- "5 40\n" +
- "6 40\n";
-
- final String edges = "1 2\n" +
- "2 3\n" +
- "3 1\n" +
- "4 5\n" +
- "5 6\n" +
- "6 4\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagationExample.main(new String[]{verticesPath, edgesPath, resultPath, "6", "2"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,40\n" +
- "5,40\n" +
- "6,40\n";
- }
-
- // -------------------------------------------------------------------------
- // Util methods
- // -------------------------------------------------------------------------
-
- private String createTempFile(final String rows) throws Exception {
- File tempFile = tempFolder.newFile();
- Files.write(rows, tempFile, Charsets.UTF_8);
- return tempFile.toURI().toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
deleted file mode 100644
index f7a585d..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapEdgesITCase extends MultipleProgramsTestBase {
-
- public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testWithSameValue() throws Exception {
- /*
- * Test mapEdges() keeping the same value type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2,13\n" +
- "1,3,14\n" +
- "2,3,24\n" +
- "3,4,35\n" +
- "3,5,36\n" +
- "4,5,46\n" +
- "5,1,52\n";
- }
-
- @Test
- public void testWithStringValue() throws Exception {
- /*
- * Test mapEdges() and change the value type to String
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2,string(12)\n" +
- "1,3,string(13)\n" +
- "2,3,string(23)\n" +
- "3,4,string(34)\n" +
- "3,5,string(35)\n" +
- "4,5,string(45)\n" +
- "5,1,string(51)\n";
- }
-
- @Test
- public void testWithTuple1Type() throws Exception {
- /*
- * Test mapEdges() and change the value type to a Tuple1
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,(12)\n" +
- "1,3,(13)\n" +
- "2,3,(23)\n" +
- "3,4,(34)\n" +
- "3,5,(35)\n" +
- "4,5,(45)\n" +
- "5,1,(51)\n";
- }
-
- @Test
- public void testWithCustomType() throws Exception {
- /*
- * Test mapEdges() and change the value type to a custom type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,(T,12)\n" +
- "1,3,(T,13)\n" +
- "2,3,(T,23)\n" +
- "3,4,(T,34)\n" +
- "3,5,(T,35)\n" +
- "4,5,(T,45)\n" +
- "5,1,(T,51)\n";
- }
-
- @Test
- public void testWithParametrizedCustomType() throws Exception {
- /*
- * Test mapEdges() and change the value type to a parameterized custom type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
- new ToCustomParametrizedTypeMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2,(12.0,12)\n" +
- "1,3,(13.0,13)\n" +
- "2,3,(23.0,23)\n" +
- "3,4,(34.0,34)\n" +
- "3,5,(35.0,35)\n" +
- "4,5,(45.0,45)\n" +
- "5,1,(51.0,51)\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> {
- public Long map(Edge<Long, Long> edge) throws Exception {
- return edge.getValue()+1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> {
- public String map(Edge<Long, Long> edge) throws Exception {
- return String.format("string(%d)", edge.getValue());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
- public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
- Tuple1<Long> tupleValue = new Tuple1<Long>();
- tupleValue.setFields(edge.getValue());
- return tupleValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> {
- public DummyCustomType map(Edge<Long, Long> edge) throws Exception {
- DummyCustomType dummyValue = new DummyCustomType();
- dummyValue.setIntField(edge.getValue().intValue());
- return dummyValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>,
- DummyCustomParameterizedType<Double>> {
-
- public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
- DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
- dummyValue.setIntField(edge.getValue().intValue());
- dummyValue.setTField(new Double(edge.getValue()));
- return dummyValue;
- }
- }
-}
\ No newline at end of file
[3/7] flink git commit: [FLINK-1522][FLINK-1576] Updated
LabelPropagationExample and test
Posted by va...@apache.org.
[FLINK-1522][FLINK-1576] Updated LabelPropagationExample and test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e306c629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e306c629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e306c629
Branch: refs/heads/master
Commit: e306c62968b94ed5124405c9ff8c02147cdba250
Parents: 8961bd1
Author: balidani <ba...@gmail.com>
Authored: Fri Feb 20 11:30:57 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:04:06 2015 +0100
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 60 ++++++------
.../graph/example/LabelPropagationExample.java | 87 ++++++++++++++++--
.../test/LabelPropagationExampleITCase.java | 97 ++++++++++++++++++++
3 files changed, 207 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 8e00f98..13ed002
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -53,34 +53,34 @@ under the License.
</dependency>
</dependencies>
- <!-- See main pom.xml for explanation of profiles -->
- <profiles>
- <profile>
- <id>hadoop-1</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop1--><name>hadoop.profile</name><value>1</value>
- </property>
- </activation>
- <dependencies>
- <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </profile>
- <profile>
- <id>hadoop-2</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop2--><name>!hadoop.profile</name>
- </property>
- </activation>
- </profile>
- </profiles>
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index c490bb3..78cb5d5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -42,31 +43,84 @@ public class LabelPropagationExample implements ProgramDescription {
public static void main(String[] args) throws Exception {
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // Set up the execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ // Set up the graph
DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+ // Set up the program
DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
new LabelPropagation<Long>(maxIterations)).getVertices();
- verticesWithCommunity.print();
+ // Emit results
+ if(fileOutput) {
+ verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ verticesWithCommunity.print();
+ }
- env.execute();
+ // Execute the program
+ env.execute("Label Propagation Example");
}
- @Override
- public String getDescription() {
- return "Label Propagation Example";
- }
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
private static long numVertices = 100;
- private static int maxIterations = 20;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ if(args.length != 5) {
+ System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ numVertices = Integer.parseInt(args[3]);
+ maxIterations = Integer.parseInt(args[4]);
+ } else {
+ System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+ }
+ return true;
+ }
@SuppressWarnings("serial")
private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+
+ if (fileOutput) {
+ return env.readCsvFile(vertexInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
+ @Override
+ public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ return new Vertex<Long, Long>(value.f0, value.f1);
+ }
+ });
+ }
+
return env.generateSequence(1, numVertices).map(
new MapFunction<Long, Vertex<Long, Long>>() {
public Vertex<Long, Long> map(Long l) throws Exception {
@@ -77,6 +131,20 @@ public class LabelPropagationExample implements ProgramDescription {
@SuppressWarnings("serial")
private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if (fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+ @Override
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
return env.generateSequence(1, numVertices).flatMap(
new FlatMapFunction<Long, Edge<Long, NullValue>>() {
@Override
@@ -91,4 +159,9 @@ public class LabelPropagationExample implements ProgramDescription {
}
});
}
+
+ @Override
+ public String getDescription() {
+ return "Label Propagation Example";
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
new file mode 100755
index 0000000..d5b2239
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.LabelPropagationExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
+
+ public LabelPropagationExampleITCase(ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ private String verticesPath;
+ private String edgesPath;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+
+ final String vertices = "1 1\n" +
+ "2 2\n" +
+ "3 3\n" +
+ "4 4\n" +
+ "5 5\n";
+
+ final String edges = "1 2\n" +
+ "1 3\n" +
+ "2 3\n" +
+ "3 4\n" +
+ "3 5\n" +
+ "4 5\n" +
+ "5 1\n";
+
+ File verticesFile = tempFolder.newFile();
+ Files.write(vertices, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(edges, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testLabelPropagation() throws Exception {
+ /*
+ * Test the label propagation example
+ */
+ LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "5", "16"});
+
+ expectedResult = "1,5\n" +
+ "2,5\n" +
+ "3,5\n" +
+ "4,5\n" +
+ "5,5\n";
+ }
+}
[6/7] flink git commit: [gelly] refactored tests;
removed duplicate data from TestGraphUtils
Posted by va...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
deleted file mode 100644
index 4e2c858..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapVerticesITCase extends MultipleProgramsTestBase {
-
- public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testWithSameValue() throws Exception {
- /*
- * Test mapVertices() keeping the same value type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,2\n" +
- "2,3\n" +
- "3,4\n" +
- "4,5\n" +
- "5,6\n";
- }
-
- @Test
- public void testWithStringValue() throws Exception {
- /*
- * Test mapVertices() and change the value type to String
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,one\n" +
- "2,two\n" +
- "3,three\n" +
- "4,four\n" +
- "5,five\n";
- }
-
- @Test
- public void testWithtuple1Value() throws Exception {
- /*
- * Test mapVertices() and change the value type to a Tuple1
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,(1)\n" +
- "2,(2)\n" +
- "3,(3)\n" +
- "4,(4)\n" +
- "5,(5)\n";
- }
-
- @Test
- public void testWithCustomType() throws Exception {
- /*
- * Test mapVertices() and change the value type to a custom type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,(T,1)\n" +
- "2,(T,2)\n" +
- "3,(T,3)\n" +
- "4,(T,4)\n" +
- "5,(T,5)\n";
- }
-
- @Test
- public void testWithCustomParametrizedType() throws Exception {
- /*
- * Test mapVertices() and change the value type to a parameterized custom type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
- new ToCustomParametrizedTypeMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,(1.0,1)\n" +
- "2,(2.0,2)\n" +
- "3,(3.0,3)\n" +
- "4,(4.0,4)\n" +
- "5,(5.0,5)\n";
- }
-
- @SuppressWarnings("serial")
- private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
- public Long map(Vertex<Long, Long> value) throws Exception {
- return value.getValue()+1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
- public String map(Vertex<Long, Long> vertex) throws Exception {
- String stringValue;
- if (vertex.getValue() == 1) {
- stringValue = "one";
- }
- else if (vertex.getValue() == 2) {
- stringValue = "two";
- }
- else if (vertex.getValue() == 3) {
- stringValue = "three";
- }
- else if (vertex.getValue() == 4) {
- stringValue = "four";
- }
- else if (vertex.getValue() == 5) {
- stringValue = "five";
- }
- else {
- stringValue = "";
- }
- return stringValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
- public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
- Tuple1<Long> tupleValue = new Tuple1<Long>();
- tupleValue.setFields(vertex.getValue());
- return tupleValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
- public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
- DummyCustomType dummyValue = new DummyCustomType();
- dummyValue.setIntField(vertex.getValue().intValue());
- return dummyValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>,
- DummyCustomParameterizedType<Double>> {
-
- public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
- DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
- dummyValue.setIntField(vertex.getValue().intValue());
- dummyValue.setTField(new Double(vertex.getValue()));
- return dummyValue;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
deleted file mode 100644
index 29d76f0..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
-
- public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testLowestWeightOutNeighbor() throws Exception {
- /*
- * Get the lowest-weight out-neighbor
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,3\n" +
- "3,4\n" +
- "4,5\n" +
- "5,1\n";
- }
-
- @Test
- public void testLowestWeightInNeighbor() throws Exception {
- /*
- * Get the lowest-weight in-neighbor
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,5\n" +
- "2,1\n" +
- "3,1\n" +
- "4,3\n" +
- "5,3\n";
- }
-
- @Test
- public void testMaxWeightEdge() throws Exception {
- /*
- * Get the maximum weight among all edges
- * of a vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
- graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
- verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,51\n" +
- "2,23\n" +
- "3,35\n" +
- "4,45\n" +
- "5,51\n";
- }
-
- @Test
- public void testLowestWeightOutNeighborNoValue() throws Exception {
- /*
- * Get the lowest-weight out-neighbor
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,2\n" +
- "2,3\n" +
- "3,4\n" +
- "4,5\n" +
- "5,1\n";
- }
-
- @Test
- public void testLowestWeightInNeighborNoValue() throws Exception {
- /*
- * Get the lowest-weight in-neighbor
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,5\n" +
- "2,1\n" +
- "3,1\n" +
- "4,3\n" +
- "5,3\n";
- }
-
- @Test
- public void testMaxWeightAllNeighbors() throws Exception {
- /*
- * Get the maximum weight among all edges
- * of a vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
- graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
- verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,51\n" +
- "2,23\n" +
- "3,35\n" +
- "4,45\n" +
- "5,51\n";
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(
- Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
-
- long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
-
- for (Edge<Long, Long> edge: edges) {
- if (edge.getValue() < weight) {
- weight = edge.getValue();
- minNeighorId = edge.getTarget();
- }
- }
- return new Tuple2<Long, Long>(v.getId(), minNeighorId);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
-
- long weight = Long.MIN_VALUE;
-
- for (Edge<Long, Long> edge: edges) {
- if (edge.getValue() > weight) {
- weight = edge.getValue();
- }
- }
- return new Tuple2<Long, Long>(v.getId(), weight);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
- long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
- long vertexId = -1;
- long i=0;
-
- for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
- if (edge.f1.getValue() < weight) {
- weight = edge.f1.getValue();
- minNeighorId = edge.f1.getTarget();
- }
- if (i==0) {
- vertexId = edge.f0;
- } i++;
- }
- return new Tuple2<Long, Long>(vertexId, minNeighorId);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
- long weight = Long.MIN_VALUE;
- long vertexId = -1;
- long i=0;
-
- for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
- if (edge.f1.getValue() > weight) {
- weight = edge.f1.getValue();
- }
- if (i==0) {
- vertexId = edge.f0;
- } i++;
- }
- return new Tuple2<Long, Long>(vertexId, weight);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(
- Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
-
- long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
-
- for (Edge<Long, Long> edge: edges) {
- if (edge.getValue() < weight) {
- weight = edge.getValue();
- minNeighorId = edge.getSource();
- }
- }
- return new Tuple2<Long, Long>(v.getId(), minNeighorId);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
- long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
- long vertexId = -1;
- long i=0;
-
- for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
- if (edge.f1.getValue() < weight) {
- weight = edge.f1.getValue();
- minNeighorId = edge.f1.getSource();
- }
- if (i==0) {
- vertexId = edge.f0;
- } i++;
- }
- return new Tuple2<Long, Long>(vertexId, minNeighorId);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
deleted file mode 100644
index d385399..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunction;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
-
- public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testSumOfOutNeighbors() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
- }
-
- @Test
- public void testSumOfInNeighbors() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
-
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
- expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
- }
-
- @Test
- public void testSumOfOAllNeighbors() throws Exception {
- /*
- * Get the sum of all neighbor values
- * including own vertex value
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,11\n" +
- "2,6\n" +
- "3,15\n" +
- "4,12\n" +
- "5,13\n";
- }
-
- @Test
- public void testSumOfOutNeighborsNoValue() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
- }
-
- @Test
- public void testSumOfInNeighborsNoValue() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
-
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
- }
-
- @Test
- public void testSumOfAllNeighborsNoValue() throws Exception {
- /*
- * Get the sum of all neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
-
- expectedResult = "1,10\n" +
- "2,4\n" +
- "3,12\n" +
- "4,8\n" +
- "5,8\n";
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- return new Tuple2<Long, Long>(vertex.getId(), sum);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f0.getValue() * neighbor.f1.getValue();
- }
- return new Tuple2<Long, Long>(vertex.getId(), sum);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue();
- }
- return new Tuple2<Long, Long>(next.f0, sum);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue() * next.f1.getValue();
- }
- return new Tuple2<Long, Long>(next.f0, sum);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue();
- }
- return new Tuple2<Long, Long>(next.f0, sum);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 9f6569b..75355f0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -33,27 +33,47 @@ public class TestGraphUtils {
public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
ExecutionEnvironment env) {
- List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
- vertices.add(new Vertex<Long, Long>(1L, 1L));
- vertices.add(new Vertex<Long, Long>(2L, 2L));
- vertices.add(new Vertex<Long, Long>(3L, 3L));
- vertices.add(new Vertex<Long, Long>(4L, 4L));
- vertices.add(new Vertex<Long, Long>(5L, 5L));
-
- return env.fromCollection(vertices);
+
+ return env.fromCollection(getLongLongVertices());
}
public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
ExecutionEnvironment env) {
- List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
- edges.add(new Edge<Long, Long>(1L, 2L, 12L));
- edges.add(new Edge<Long, Long>(1L, 3L, 13L));
- edges.add(new Edge<Long, Long>(2L, 3L, 23L));
- edges.add(new Edge<Long, Long>(3L, 4L, 34L));
- edges.add(new Edge<Long, Long>(3L, 5L, 35L));
- edges.add(new Edge<Long, Long>(4L, 5L, 45L));
- edges.add(new Edge<Long, Long>(5L, 1L, 51L));
-
+
+ return env.fromCollection(getLongLongEdges());
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+
+ edges.remove(1);
+ edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+
+ edges.remove(0);
+ edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+
+ edges.remove(0);
+ edges.remove(1);
+ edges.remove(2);
+ edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+ edges.add(new Edge<Long, Long>(1L, 12L, 12L));
+ edges.add(new Edge<Long, Long>(13L, 33L, 13L));
+
return env.fromCollection(edges);
}
@@ -193,12 +213,10 @@ public class TestGraphUtils {
*/
public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
ExecutionEnvironment env) {
- List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ List<Vertex<Long, Long>> vertices = getLongLongVertices();
+
+ vertices.remove(0);
vertices.add(new Vertex<Long, Long>(15L, 1L));
- vertices.add(new Vertex<Long, Long>(2L, 2L));
- vertices.add(new Vertex<Long, Long>(3L, 3L));
- vertices.add(new Vertex<Long, Long>(4L, 4L));
- vertices.add(new Vertex<Long, Long>(5L, 5L));
return env.fromCollection(vertices);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
new file mode 100755
index 0000000..185d922
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.LabelPropagationExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
+
+ public LabelPropagationExampleITCase(ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of label propagation example with a simple graph
+ */
+
+ final String vertices = "1 10\n" +
+ "2 10\n" +
+ "3 30\n" +
+ "4 40\n" +
+ "5 40\n" +
+ "6 40\n" +
+ "7 70\n";
+
+ final String edges = "1 3\n" +
+ "2 3\n" +
+ "4 7\n" +
+ "5 7\n" +
+ "6 7\n" +
+ "7 3\n";
+
+ String verticesPath = createTempFile(vertices);
+ String edgesPath = createTempFile(edges);
+
+ LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
+
+ expectedResult = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,40\n" +
+ "5,40\n" +
+ "6,40\n" +
+ "7,40\n";
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test the label propagation example where a tie must be broken
+ */
+
+ final String vertices = "1 10\n" +
+ "2 10\n" +
+ "3 10\n" +
+ "4 10\n" +
+ "5 0\n" +
+ "6 20\n" +
+ "7 20\n" +
+ "8 20\n" +
+ "9 20\n";
+
+ final String edges = "1 5\n" +
+ "2 5\n" +
+ "3 5\n" +
+ "4 5\n" +
+ "6 5\n" +
+ "7 5\n" +
+ "8 5\n" +
+ "9 5\n";
+
+ String verticesPath = createTempFile(vertices);
+ String edgesPath = createTempFile(edges);
+
+ LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
+
+ expectedResult = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,10\n" +
+ "5,20\n" +
+ "6,20\n" +
+ "7,20\n" +
+ "8,20\n" +
+ "9,20\n";
+ }
+
+ // -------------------------------------------------------------------------
+ // Util methods
+ // -------------------------------------------------------------------------
+
+ private String createTempFile(final String rows) throws Exception {
+ File tempFile = tempFolder.newFile();
+ Files.write(rows, tempFile, Charsets.UTF_8);
+ return tempFile.toURI().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
new file mode 100644
index 0000000..8c363a5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DegreesITCase extends MultipleProgramsTestBase {
+
+ public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testOutDegrees() throws Exception {
+ /*
+ * Test outDegrees()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.outDegrees().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,1\n" +
+ "3,2\n" +
+ "4,1\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testOutDegreesWithNoOutEdges() throws Exception {
+ /*
+ * Test outDegrees() no outgoing edges
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+ graph.outDegrees().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,3\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n" +
+ "5,0\n";
+ }
+
+ @Test
+ public void testInDegrees() throws Exception {
+ /*
+ * Test inDegrees()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.inDegrees().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,1\n" +
+ "2,1\n" +
+ "3,2\n" +
+ "4,1\n" +
+ "5,2\n";
+ }
+
+ @Test
+ public void testInDegreesWithNoInEdge() throws Exception {
+ /*
+ * Test inDegrees() no ingoing edge
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+ graph.inDegrees().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,0\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n" +
+ "5,3\n";
+ }
+
+ @Test
+ public void testGetDegrees() throws Exception {
+ /*
+ * Test getDegrees()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.getDegrees().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,3\n" +
+ "2,2\n" +
+ "3,4\n" +
+ "4,2\n" +
+ "5,3\n";
+ }
+
+ @Test
+ public void testGetDegreesWithDisconnectedData() throws Exception {
+ /*
+ * Test getDegrees() with disconnected data
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, NullValue, Long> graph =
+ Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+
+ graph.outDegrees().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2\n" +
+ "2,1\n" +
+ "3,0\n" +
+ "4,1\n" +
+ "5,0\n";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
new file mode 100644
index 0000000..975d21a
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class FromCollectionITCase extends MultipleProgramsTestBase {
+
+ public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testFromCollectionVerticesEdges() throws Exception {
+ /*
+ * Test fromCollection(vertices, edges):
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testFromCollectionEdgesNoInitialValue() throws Exception {
+ /*
+ * Test fromCollection(edges) with no initial value for the vertices
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+ env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,(null)\n" +
+ "2,(null)\n" +
+ "3,(null)\n" +
+ "4,(null)\n" +
+ "5,(null)\n";
+ }
+
+ @Test
+ public void testFromCollectionEdgesWithInitialValue() throws Exception {
+ /*
+ * Test fromCollection(edges) with vertices initialised by a
+ * function that takes the id and doubles it
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+ new InitVerticesMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2\n" +
+ "2,4\n" +
+ "3,6\n" +
+ "4,8\n" +
+ "5,10\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVerticesMapper implements MapFunction<Long, Long> {
+ public Long map(Long vertexId) {
+ return vertexId * 2;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
new file mode 100644
index 0000000..6848dad
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationITCase extends MultipleProgramsTestBase {
+
+ public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testCreateWithoutVertexValues() throws Exception {
+ /*
+ * Test create() with edge dataset and no vertex values
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,(null)\n" +
+ "2,(null)\n" +
+ "3,(null)\n" +
+ "4,(null)\n" +
+ "5,(null)\n";
+ }
+
+ @Test
+ public void testCreateWithMapper() throws Exception {
+ /*
+ * Test create() with edge dataset and a mapper that assigns the id as value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+ new AssignIdAsValueMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,1\n" +
+ "2,2\n" +
+ "3,3\n" +
+ "4,4\n" +
+ "5,5\n";
+ }
+
+ @Test
+ public void testCreateWithCustomVertexValue() throws Exception {
+ /*
+ * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
+ TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,(2.0,0)\n" +
+ "2,(4.0,1)\n" +
+ "3,(6.0,2)\n" +
+ "4,(8.0,3)\n" +
+ "5,(10.0,4)\n";
+ }
+
+ @Test
+ public void testValidate() throws Exception {
+ /*
+ * Test validate():
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
+ DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+ DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+
+ result.writeAsText(resultPath);
+ env.execute();
+
+ expectedResult = "true\n";
+ }
+
+ @Test
+ public void testValidateWithInvalidIds() throws Exception {
+ /*
+ * Test validate() - invalid vertex ids
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
+ DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+ DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+ result.writeAsText(resultPath);
+ env.execute();
+
+ expectedResult = "false\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
+ public Long map(Long vertexId) {
+ return vertexId;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignCustomVertexValueMapper implements
+ MapFunction<Long, DummyCustomParameterizedType<Double>> {
+
+ DummyCustomParameterizedType<Double> dummyValue =
+ new DummyCustomParameterizedType<Double>();
+
+ public DummyCustomParameterizedType<Double> map(Long vertexId) {
+ dummyValue.setIntField(vertexId.intValue()-1);
+ dummyValue.setTField(vertexId*2.0);
+ return dummyValue;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
new file mode 100644
index 0000000..010ae1d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
+
+ public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testWithDoubleValueMapper() throws Exception {
+ /*
+ * Test create() with edge dataset and a mapper that assigns a double constant as value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+ new AssignDoubleValueMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,0.1\n" +
+ "2,0.1\n" +
+ "3,0.1\n" +
+ "4,0.1\n" +
+ "5,0.1\n";
+ }
+
+ @Test
+ public void testWithTuple2ValueMapper() throws Exception {
+ /*
+ * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
+ TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,(2,42)\n" +
+ "2,(4,42)\n" +
+ "3,(6,42)\n" +
+ "4,(8,42)\n" +
+ "5,(10,42)\n";
+ }
+
+ @Test
+ public void testWithConstantValueMapper() throws Exception {
+ /*
+ * Test create() with edge dataset with String key type
+ * and a mapper that assigns a double constant as value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
+ new AssignDoubleConstantMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,0.1\n" +
+ "2,0.1\n" +
+ "3,0.1\n" +
+ "4,0.1\n" +
+ "5,0.1\n";
+ }
+
+ @Test
+ public void testWithDCustomValueMapper() throws Exception {
+ /*
+ * Test create() with edge dataset and a mapper that assigns a custom vertex value
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
+ TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,(F,0)\n" +
+ "2,(F,1)\n" +
+ "3,(F,2)\n" +
+ "4,(F,3)\n" +
+ "5,(F,4)\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 0.1d;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
+ public Tuple2<Long, Long> map(Long vertexId) {
+ return new Tuple2<Long, Long>(vertexId*2, 42l);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> {
+ public Double map(String value) {
+ return 0.1d;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> {
+ public DummyCustomType map(Long vertexId) {
+ return new DummyCustomType(vertexId.intValue()-1, false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
new file mode 100644
index 0000000..502d529
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMutationsITCase extends MultipleProgramsTestBase {
+
+ public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testAddVertex() throws Exception {
+ /*
+ * Test addVertex() -- simple case
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+ graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n" +
+ "6,1,61\n";
+ }
+
+ @Test
+ public void testAddVertexExisting() throws Exception {
+ /*
+ * Test addVertex() -- add an existing vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(1L, 5L, 15L));
+ graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "1,5,15\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testAddVertexNoEdges() throws Exception {
+ /*
+ * Test addVertex() -- add vertex with empty edge set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,1\n" +
+ "2,2\n" +
+ "3,3\n" +
+ "4,4\n" +
+ "5,5\n" +
+ "6,6\n";
+ }
+
+ @Test
+ public void testRemoveVertex() throws Exception {
+ /*
+ * Test removeVertex() -- simple case
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n";
+ }
+
+ @Test
+ public void testRemoveInvalidVertex() throws Exception {
+ /*
+ * Test removeVertex() -- remove an invalid vertex
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testAddEdge() throws Exception {
+ /*
+ * Test addEdge() -- simple case
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
+ 61L);
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n" +
+ "6,1,61\n";
+ }
+
+ @Test
+ public void testAddExistingEdge() throws Exception {
+ /*
+ * Test addEdge() -- add already existing edge
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
+ 12L);
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testRemoveVEdge() throws Exception {
+ /*
+ * Test removeEdge() -- simple case
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n";
+ }
+
+ @Test
+ public void testRemoveInvalidEdge() throws Exception {
+ /*
+ * Test removeEdge() -- invalid edge
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
new file mode 100644
index 0000000..6c4f1ef
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphOperationsITCase extends MultipleProgramsTestBase {
+
+ public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testUndirected() throws Exception {
+ /*
+ * Test getUndirected()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.getUndirected().getEdges().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2,12\n" + "2,1,12\n" +
+ "1,3,13\n" + "3,1,13\n" +
+ "2,3,23\n" + "3,2,23\n" +
+ "3,4,34\n" + "4,3,34\n" +
+ "3,5,35\n" + "5,3,35\n" +
+ "4,5,45\n" + "5,4,45\n" +
+ "5,1,51\n" + "1,5,51\n";
+ }
+
+ @Test
+ public void testReverse() throws Exception {
+ /*
+ * Test reverse()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.reverse().getEdges().writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "2,1,12\n" +
+ "3,1,13\n" +
+ "3,2,23\n" +
+ "4,3,34\n" +
+ "5,3,35\n" +
+ "5,4,45\n" +
+ "1,5,51\n";
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testSubGraph() throws Exception {
+ /*
+ * Test subgraph:
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
+ public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+ return (vertex.getValue() > 2);
+ }
+ },
+ new FilterFunction<Edge<Long, Long>>() {
+ public boolean filter(Edge<Long, Long> edge) throws Exception {
+ return (edge.getValue() > 34);
+ }
+ }).getEdges().writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "3,5,35\n" +
+ "4,5,45\n";
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testFilterVertices() throws Exception {
+ /*
+ * Test filterOnVertices:
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
+ public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+ return (vertex.getValue() > 2);
+ }
+ }).getEdges().writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n";
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testFilterEdges() throws Exception {
+ /*
+ * Test filterOnEdges:
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
+ public boolean filter(Edge<Long, Long> edge) throws Exception {
+ return (edge.getValue() > 34);
+ }
+ }).getEdges().writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testNumberOfVertices() throws Exception {
+ /*
+ * Test numberOfVertices()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.numberOfVertices().writeAsText(resultPath);
+
+ env.execute();
+ expectedResult = "5";
+ }
+
+ @Test
+ public void testNumberOfEdges() throws Exception {
+ /*
+ * Test numberOfEdges()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.numberOfEdges().writeAsText(resultPath);
+
+ env.execute();
+ expectedResult = "7";
+ }
+
+ @Test
+ public void testVertexIds() throws Exception {
+ /*
+ * Test getVertexIds()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.getVertexIds().writeAsText(resultPath);
+
+ env.execute();
+ expectedResult = "1\n2\n3\n4\n5\n";
+ }
+
+ @Test
+ public void testEdgesIds() throws Exception {
+ /*
+ * Test getEdgeIds()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+ graph.getEdgeIds().writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "1,2\n" + "1,3\n" +
+ "2,3\n" + "3,4\n" +
+ "3,5\n" + "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ /*
+ * Test union()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+
+ vertices.add(new Vertex<Long, Long>(6L, 6L));
+ edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+
+ graph = graph.union(Graph.fromCollection(vertices, edges, env));
+
+ graph.getEdges().writeAsCsv(resultPath);
+
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n" +
+ "6,1,61\n";
+ }
+}
\ No newline at end of file
[4/7] flink git commit: [FLINK-1522][FLINK-1576][gelly] Added more
test cases for Label Propagation
Posted by va...@apache.org.
[FLINK-1522][FLINK-1576][gelly] Added more test cases for Label Propagation
This closes #441
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b529b62a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b529b62a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b529b62a
Branch: refs/heads/master
Commit: b529b62a4f2e4b7482564ad6c65996c8f5b0d824
Parents: e306c62
Author: balidani <ba...@gmail.com>
Authored: Tue Mar 3 18:58:59 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:30:58 2015 +0100
----------------------------------------------------------------------
.../test/LabelPropagationExampleITCase.java | 145 ++++++++++++++-----
1 file changed, 112 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b529b62a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
index d5b2239..dfb0f3f 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
@@ -42,56 +42,135 @@ public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
private String resultPath;
private String expectedResult;
- private String verticesPath;
- private String edgesPath;
-
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Before
public void before() throws Exception{
resultPath = tempFolder.newFile().toURI().toString();
+ }
- final String vertices = "1 1\n" +
- "2 2\n" +
- "3 3\n" +
- "4 4\n" +
- "5 5\n";
-
- final String edges = "1 2\n" +
- "1 3\n" +
- "2 3\n" +
- "3 4\n" +
- "3 5\n" +
- "4 5\n" +
- "5 1\n";
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
- File verticesFile = tempFolder.newFile();
- Files.write(vertices, verticesFile, Charsets.UTF_8);
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of label propagation example with a simple graph
+ */
- File edgesFile = tempFolder.newFile();
- Files.write(edges, edgesFile, Charsets.UTF_8);
+ final String vertices = "1 10\n" +
+ "2 10\n" +
+ "3 30\n" +
+ "4 40\n" +
+ "5 40\n" +
+ "6 40\n" +
+ "7 70\n";
- verticesPath = verticesFile.toURI().toString();
- edgesPath = edgesFile.toURI().toString();
+ final String edges = "1 3\n" +
+ "2 3\n" +
+ "4 7\n" +
+ "5 7\n" +
+ "6 7\n" +
+ "7 3\n";
+
+ String verticesPath = createTempFile(vertices);
+ String edgesPath = createTempFile(edges);
+
+ LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "7", "1"});
+
+ expectedResult = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,40\n" +
+ "5,40\n" +
+ "6,40\n" +
+ "7,40\n";
}
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test the label propagation example where a tie must be broken
+ */
+
+ final String vertices = "1 10\n" +
+ "2 10\n" +
+ "3 10\n" +
+ "4 10\n" +
+ "5 0\n" +
+ "6 20\n" +
+ "7 20\n" +
+ "8 20\n" +
+ "9 20\n";
+
+ final String edges = "1 5\n" +
+ "2 5\n" +
+ "3 5\n" +
+ "4 5\n" +
+ "6 5\n" +
+ "7 5\n" +
+ "8 5\n" +
+ "9 5\n";
+
+ String verticesPath = createTempFile(vertices);
+ String edgesPath = createTempFile(edges);
+
+ LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "9", "1"});
+
+ expectedResult = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,10\n" +
+ "5,20\n" +
+ "6,20\n" +
+ "7,20\n" +
+ "8,20\n" +
+ "9,20\n";
}
@Test
- public void testLabelPropagation() throws Exception {
+ public void testTermination() throws Exception {
/*
- * Test the label propagation example
+ * Test the label propagation example where the algorithm terminates on the first iteration
*/
- LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "5", "16"});
- expectedResult = "1,5\n" +
- "2,5\n" +
- "3,5\n" +
- "4,5\n" +
- "5,5\n";
+ final String vertices = "1 10\n" +
+ "2 10\n" +
+ "3 10\n" +
+ "4 40\n" +
+ "5 40\n" +
+ "6 40\n";
+
+ final String edges = "1 2\n" +
+ "2 3\n" +
+ "3 1\n" +
+ "4 5\n" +
+ "5 6\n" +
+ "6 4\n";
+
+ String verticesPath = createTempFile(vertices);
+ String edgesPath = createTempFile(edges);
+
+ LabelPropagationExample.main(new String[]{verticesPath, edgesPath, resultPath, "6", "2"});
+
+ expectedResult = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,40\n" +
+ "5,40\n" +
+ "6,40\n";
+ }
+
+ // -------------------------------------------------------------------------
+ // Util methods
+ // -------------------------------------------------------------------------
+
+ private String createTempFile(final String rows) throws Exception {
+ File tempFile = tempFolder.newFile();
+ Files.write(rows, tempFile, Charsets.UTF_8);
+ return tempFile.toURI().toString();
}
}
[2/7] flink git commit: [FLINK-1522][gelly] Fixed faulty split into
create/runVertexCentricIteration
Posted by va...@apache.org.
[FLINK-1522][gelly] Fixed faulty split into create/runVertexCentricIteration
This closes #429
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8961bd19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8961bd19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8961bd19
Branch: refs/heads/master
Commit: 8961bd19a82e44eb8655232f37a5baadb59a964b
Parents: f329e4e
Author: andralungu <lu...@gmail.com>
Authored: Wed Mar 4 00:13:08 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:03:47 2015 +0100
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 31 ++++++++++++++++++++
.../library/SingleSourceShortestPaths.java | 9 +++---
2 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8961bd19/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
index af1fcb4..8e00f98 100644
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -52,4 +52,35 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/8961bd19/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index f069000..3e9a29d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -45,11 +45,12 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
- VertexCentricIteration<K, Double, Double, Double> iteration = input.mapVertices(
- new InitVerticesMapper<K>(srcVertexId)).createVertexCentricIteration(new VertexDistanceUpdater<K>(),
- new MinDistanceMessenger<K>(), maxIterations);
+ Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId));
- return input.runVertexCentricIteration(iteration);
+ VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration(
+ new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations);
+
+ return mappedInput.runVertexCentricIteration(iteration);
}
public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
[5/7] flink git commit: [gelly] refactored tests;
removed duplicate data from TestGraphUtils
Posted by va...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
new file mode 100644
index 0000000..dfb315e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
+
+ public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testWithEdgesInputDataset() throws Exception {
+ /*
+ * Test joinWithEdges with the input DataSet parameter identical
+ * to the edge DataSet
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+ .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,68\n" +
+ "3,5,70\n" +
+ "4,5,90\n" +
+ "5,1,102\n";
+ }
+
+ @Test
+ public void testWithLessElements() throws Exception {
+ /*
+ * Test joinWithEdges with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet, but of the same type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+ .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testWithLessElementsDifferentType() throws Exception {
+ /*
+ * Test joinWithEdges with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet and of a different type(Boolean)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+ .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testWithNoCommonKeys() throws Exception {
+ /*
+ * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet
+ * - the iterator becomes empty.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+ new DoubleValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,68\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testWithCustomType() throws Exception {
+ /*
+ * Test joinWithEdges with a DataSet containing custom parametrised type input values
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+ new CustomValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,10\n" +
+ "1,3,20\n" +
+ "2,3,30\n" +
+ "3,4,40\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testWithEdgesOnSource() throws Exception {
+ /*
+ * Test joinWithEdgesOnSource with the input DataSet parameter identical
+ * to the edge DataSet
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
+ .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,25\n" +
+ "2,3,46\n" +
+ "3,4,68\n" +
+ "3,5,69\n" +
+ "4,5,90\n" +
+ "5,1,102\n";
+ }
+
+ @Test
+ public void testOnSourceWithLessElements() throws Exception {
+ /*
+ * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet, but of the same type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+ .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,25\n" +
+ "2,3,46\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testOnSourceWithDifferentType() throws Exception {
+ /*
+ * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet and of a different type(Boolean)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+ .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testOnSourceWithNoCommonKeys() throws Exception {
+ /*
+ * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet
+ * - the iterator becomes empty.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+ new DoubleValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,20\n" +
+ "1,3,20\n" +
+ "2,3,60\n" +
+ "3,4,80\n" +
+ "3,5,80\n" +
+ "4,5,120\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testOnSourceWithCustom() throws Exception {
+ /*
+ * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+ new CustomValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,10\n" +
+ "1,3,10\n" +
+ "2,3,30\n" +
+ "3,4,40\n" +
+ "3,5,40\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testWithEdgesOnTarget() throws Exception {
+ /*
+ * Test joinWithEdgesOnTarget with the input DataSet parameter identical
+ * to the edge DataSet
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
+ .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,36\n" +
+ "3,4,68\n" +
+ "3,5,70\n" +
+ "4,5,80\n" +
+ "5,1,102\n";
+ }
+
+ @Test
+ public void testWithOnTargetWithLessElements() throws Exception {
+ /*
+ * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet, but of the same type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+ .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,36\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testOnTargetWithDifferentType() throws Exception {
+ /*
+ * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+ * less elements than the edge DataSet and of a different type(Boolean)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+ .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,24\n" +
+ "1,3,26\n" +
+ "2,3,46\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @Test
+ public void testOnTargetWithNoCommonKeys() throws Exception {
+ /*
+ * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet
+ * - the iterator becomes empty.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+ new DoubleValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,20\n" +
+ "1,3,40\n" +
+ "2,3,40\n" +
+ "3,4,80\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,140\n";
+ }
+
+ @Test
+ public void testOnTargetWithCustom() throws Exception {
+ /*
+ * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+ new CustomValueMapper());
+
+ result.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,10\n" +
+ "1,3,20\n" +
+ "2,3,20\n" +
+ "3,4,40\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+ public Long map(Tuple2<Long, Long> tuple) throws Exception {
+ return tuple.f0 + tuple.f1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
+ public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+ return new Tuple3<Long, Long, Boolean>(edge.getSource(),
+ edge.getTarget(), true);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
+ public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+ if(tuple.f1) {
+ return tuple.f0 * 2;
+ }
+ else {
+ return tuple.f0;
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+ public Long map(Tuple2<Long, Long> tuple) throws Exception {
+ return tuple.f1 * 2;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+ public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
+ return (long) tuple.f1.getIntField();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+ public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+ return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+ public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+ return new Tuple2<Long, Boolean>(edge.getSource(), true);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+ public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+ return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+ public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+ return new Tuple2<Long, Boolean>(edge.getTarget(), true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
new file mode 100644
index 0000000..28a0441
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
+
+ public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testJoinWithVertexSet() throws Exception {
+ /*
+ * Test joinWithVertices with the input DataSet parameter identical
+ * to the vertex DataSet
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
+ .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
+
+ result.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,4\n" +
+ "3,6\n" +
+ "4,8\n" +
+ "5,10\n";
+ }
+
+ @Test
+ public void testWithLessElements() throws Exception {
+ /*
+ * Test joinWithVertices with the input DataSet passed as a parameter containing
+ * less elements than the vertex DataSet, but of the same type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+ .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
+
+ result.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,4\n" +
+ "3,6\n" +
+ "4,4\n" +
+ "5,5\n";
+ }
+
+ @Test
+ public void testWithDifferentType() throws Exception {
+ /*
+ * Test joinWithVertices with the input DataSet passed as a parameter containing
+ * less elements than the vertex DataSet and of a different type(Boolean)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+ .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
+
+ result.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,4\n" +
+ "3,6\n" +
+ "4,4\n" +
+ "5,5\n";
+ }
+
+ @Test
+ public void testWithDifferentKeys() throws Exception {
+ /*
+ * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet
+ * - the iterator becomes empty.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
+ new ProjectSecondMapper());
+
+ result.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,10\n" +
+ "2,20\n" +
+ "3,30\n" +
+ "4,40\n" +
+ "5,5\n";
+ }
+
+ @Test
+ public void testWithCustomType() throws Exception {
+ /*
+ * Test joinWithVertices with a DataSet containing custom parametrised type input values
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
+ new CustomValueMapper());
+
+ result.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,10\n" +
+ "2,20\n" +
+ "3,30\n" +
+ "4,40\n" +
+ "5,5\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+ public Long map(Tuple2<Long, Long> tuple) throws Exception {
+ return tuple.f0 + tuple.f1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
+ public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
+ return new Tuple2<Long, Boolean>(vertex.getId(), true);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
+ public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+ if(tuple.f1) {
+ return tuple.f0 * 2;
+ }
+ else {
+ return tuple.f0;
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+ public Long map(Tuple2<Long, Long> tuple) throws Exception {
+ return tuple.f1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+ public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
+ return (long) tuple.f1.getIntField();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
new file mode 100644
index 0000000..a5c01cf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapEdgesITCase extends MultipleProgramsTestBase {
+
+ public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testWithSameValue() throws Exception {
+ /*
+ * Test mapEdges() keeping the same value type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
+
+ mappedEdges.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2,13\n" +
+ "1,3,14\n" +
+ "2,3,24\n" +
+ "3,4,35\n" +
+ "3,5,36\n" +
+ "4,5,46\n" +
+ "5,1,52\n";
+ }
+
+ @Test
+ public void testWithStringValue() throws Exception {
+ /*
+ * Test mapEdges() and change the value type to String
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
+
+ mappedEdges.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2,string(12)\n" +
+ "1,3,string(13)\n" +
+ "2,3,string(23)\n" +
+ "3,4,string(34)\n" +
+ "3,5,string(35)\n" +
+ "4,5,string(45)\n" +
+ "5,1,string(51)\n";
+ }
+
+ @Test
+ public void testWithTuple1Type() throws Exception {
+ /*
+ * Test mapEdges() and change the value type to a Tuple1
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
+
+ mappedEdges.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,(12)\n" +
+ "1,3,(13)\n" +
+ "2,3,(23)\n" +
+ "3,4,(34)\n" +
+ "3,5,(35)\n" +
+ "4,5,(45)\n" +
+ "5,1,(51)\n";
+ }
+
+ @Test
+ public void testWithCustomType() throws Exception {
+ /*
+ * Test mapEdges() and change the value type to a custom type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
+
+ mappedEdges.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,(T,12)\n" +
+ "1,3,(T,13)\n" +
+ "2,3,(T,23)\n" +
+ "3,4,(T,34)\n" +
+ "3,5,(T,35)\n" +
+ "4,5,(T,45)\n" +
+ "5,1,(T,51)\n";
+ }
+
+ @Test
+ public void testWithParametrizedCustomType() throws Exception {
+ /*
+ * Test mapEdges() and change the value type to a parameterized custom type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
+ new ToCustomParametrizedTypeMapper()).getEdges();
+
+ mappedEdges.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,(12.0,12)\n" +
+ "1,3,(13.0,13)\n" +
+ "2,3,(23.0,23)\n" +
+ "3,4,(34.0,34)\n" +
+ "3,5,(35.0,35)\n" +
+ "4,5,(45.0,45)\n" +
+ "5,1,(51.0,51)\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> {
+ public Long map(Edge<Long, Long> edge) throws Exception {
+ return edge.getValue()+1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> {
+ public String map(Edge<Long, Long> edge) throws Exception {
+ return String.format("string(%d)", edge.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
+ public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
+ Tuple1<Long> tupleValue = new Tuple1<Long>();
+ tupleValue.setFields(edge.getValue());
+ return tupleValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> {
+ public DummyCustomType map(Edge<Long, Long> edge) throws Exception {
+ DummyCustomType dummyValue = new DummyCustomType();
+ dummyValue.setIntField(edge.getValue().intValue());
+ return dummyValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>,
+ DummyCustomParameterizedType<Double>> {
+
+ public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
+ DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+ dummyValue.setIntField(edge.getValue().intValue());
+ dummyValue.setTField(new Double(edge.getValue()));
+ return dummyValue;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
new file mode 100644
index 0000000..0d92fc9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapVerticesITCase extends MultipleProgramsTestBase {
+
+ public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testWithSameValue() throws Exception {
+ /*
+ * Test mapVertices() keeping the same value type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
+
+ mappedVertices.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,6\n";
+ }
+
+ @Test
+ public void testWithStringValue() throws Exception {
+ /*
+ * Test mapVertices() and change the value type to String
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
+
+ mappedVertices.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,one\n" +
+ "2,two\n" +
+ "3,three\n" +
+ "4,four\n" +
+ "5,five\n";
+ }
+
+ @Test
+ public void testWithtuple1Value() throws Exception {
+ /*
+ * Test mapVertices() and change the value type to a Tuple1
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
+
+ mappedVertices.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,(1)\n" +
+ "2,(2)\n" +
+ "3,(3)\n" +
+ "4,(4)\n" +
+ "5,(5)\n";
+ }
+
+ @Test
+ public void testWithCustomType() throws Exception {
+ /*
+ * Test mapVertices() and change the value type to a custom type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
+
+ mappedVertices.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,(T,1)\n" +
+ "2,(T,2)\n" +
+ "3,(T,3)\n" +
+ "4,(T,4)\n" +
+ "5,(T,5)\n";
+ }
+
+ @Test
+ public void testWithCustomParametrizedType() throws Exception {
+ /*
+ * Test mapVertices() and change the value type to a parameterized custom type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
+ new ToCustomParametrizedTypeMapper()).getVertices();
+
+ mappedVertices.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,(1.0,1)\n" +
+ "2,(2.0,2)\n" +
+ "3,(3.0,3)\n" +
+ "4,(4.0,4)\n" +
+ "5,(5.0,5)\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+ public Long map(Vertex<Long, Long> value) throws Exception {
+ return value.getValue()+1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
+ public String map(Vertex<Long, Long> vertex) throws Exception {
+ String stringValue;
+ if (vertex.getValue() == 1) {
+ stringValue = "one";
+ }
+ else if (vertex.getValue() == 2) {
+ stringValue = "two";
+ }
+ else if (vertex.getValue() == 3) {
+ stringValue = "three";
+ }
+ else if (vertex.getValue() == 4) {
+ stringValue = "four";
+ }
+ else if (vertex.getValue() == 5) {
+ stringValue = "five";
+ }
+ else {
+ stringValue = "";
+ }
+ return stringValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
+ public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
+ Tuple1<Long> tupleValue = new Tuple1<Long>();
+ tupleValue.setFields(vertex.getValue());
+ return tupleValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
+ public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
+ DummyCustomType dummyValue = new DummyCustomType();
+ dummyValue.setIntField(vertex.getValue().intValue());
+ return dummyValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>,
+ DummyCustomParameterizedType<Double>> {
+
+ public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
+ DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+ dummyValue.setIntField(vertex.getValue().intValue());
+ dummyValue.setTField(new Double(vertex.getValue()));
+ return dummyValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
new file mode 100644
index 0000000..73dec2a
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
+
+ public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testLowestWeightOutNeighbor() throws Exception {
+ /*
+ * Get the lowest-weight out-neighbor
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testLowestWeightInNeighbor() throws Exception {
+ /*
+ * Get the lowest-weight in-neighbor
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,5\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,3\n" +
+ "5,3\n";
+ }
+
+ @Test
+ public void testMaxWeightEdge() throws Exception {
+ /*
+ * Get the maximum weight among all edges
+ * of a vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
+ graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+ verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,51\n" +
+ "2,23\n" +
+ "3,35\n" +
+ "4,45\n" +
+ "5,51\n";
+ }
+
+ @Test
+ public void testLowestWeightOutNeighborNoValue() throws Exception {
+ /*
+ * Get the lowest-weight out-neighbor
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testLowestWeightInNeighborNoValue() throws Exception {
+ /*
+ * Get the lowest-weight in-neighbor
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,5\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,3\n" +
+ "5,3\n";
+ }
+
+ @Test
+ public void testMaxWeightAllNeighbors() throws Exception {
+ /*
+ * Get the maximum weight among all edges
+ * of a vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
+ graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
+ verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,51\n" +
+ "2,23\n" +
+ "3,35\n" +
+ "4,45\n" +
+ "5,51\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(
+ Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges) {
+
+ long weight = Long.MAX_VALUE;
+ long minNeighorId = 0;
+
+ for (Edge<Long, Long> edge: edges) {
+ if (edge.getValue() < weight) {
+ weight = edge.getValue();
+ minNeighorId = edge.getTarget();
+ }
+ }
+ return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges) {
+
+ long weight = Long.MIN_VALUE;
+
+ for (Edge<Long, Long> edge: edges) {
+ if (edge.getValue() > weight) {
+ weight = edge.getValue();
+ }
+ }
+ return new Tuple2<Long, Long>(v.getId(), weight);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+ long weight = Long.MAX_VALUE;
+ long minNeighorId = 0;
+ long vertexId = -1;
+ long i=0;
+
+ for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+ if (edge.f1.getValue() < weight) {
+ weight = edge.f1.getValue();
+ minNeighorId = edge.f1.getTarget();
+ }
+ if (i==0) {
+ vertexId = edge.f0;
+ } i++;
+ }
+ return new Tuple2<Long, Long>(vertexId, minNeighorId);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+ long weight = Long.MIN_VALUE;
+ long vertexId = -1;
+ long i=0;
+
+ for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+ if (edge.f1.getValue() > weight) {
+ weight = edge.f1.getValue();
+ }
+ if (i==0) {
+ vertexId = edge.f0;
+ } i++;
+ }
+ return new Tuple2<Long, Long>(vertexId, weight);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(
+ Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges) {
+
+ long weight = Long.MAX_VALUE;
+ long minNeighorId = 0;
+
+ for (Edge<Long, Long> edge: edges) {
+ if (edge.getValue() < weight) {
+ weight = edge.getValue();
+ minNeighorId = edge.getSource();
+ }
+ }
+ return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+ long weight = Long.MAX_VALUE;
+ long minNeighorId = 0;
+ long vertexId = -1;
+ long i=0;
+
+ for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+ if (edge.f1.getValue() < weight) {
+ weight = edge.f1.getValue();
+ minNeighorId = edge.f1.getSource();
+ }
+ if (i==0) {
+ vertexId = edge.f0;
+ } i++;
+ }
+ return new Tuple2<Long, Long>(vertexId, minNeighorId);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
new file mode 100644
index 0000000..c1e982f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunction;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
+
+ public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testSumOfOutNeighbors() throws Exception {
+ /*
+ * Get the sum of out-neighbor values
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,5\n" +
+ "2,3\n" +
+ "3,9\n" +
+ "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testSumOfInNeighbors() throws Exception {
+ /*
+ * Get the sum of in-neighbor values
+ * times the edge weights for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
+
+ verticesWithSum.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,255\n" +
+ "2,12\n" +
+ "3,59\n" +
+ "4,102\n" +
+ "5,285\n";
+ }
+
+ @Test
+ public void testSumOfOAllNeighbors() throws Exception {
+ /*
+ * Get the sum of all neighbor values
+ * including own vertex value
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,11\n" +
+ "2,6\n" +
+ "3,15\n" +
+ "4,12\n" +
+ "5,13\n";
+ }
+
+ @Test
+ public void testSumOfOutNeighborsNoValue() throws Exception {
+ /*
+ * Get the sum of out-neighbor values
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,5\n" +
+ "2,3\n" +
+ "3,9\n" +
+ "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testSumOfInNeighborsNoValue() throws Exception {
+ /*
+ * Get the sum of in-neighbor values
+ * times the edge weights for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+
+ verticesWithSum.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,255\n" +
+ "2,12\n" +
+ "3,59\n" +
+ "4,102\n" +
+ "5,285\n";
+ }
+
+ @Test
+ public void testSumOfAllNeighborsNoValue() throws Exception {
+ /*
+ * Get the sum of all neighbor values
+ * for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,10\n" +
+ "2,4\n" +
+ "3,12\n" +
+ "4,8\n" +
+ "5,8\n";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ return new Tuple2<Long, Long>(vertex.getId(), sum);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+ }
+ return new Tuple2<Long, Long>(vertex.getId(), sum);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(
+ Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+ Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+ neighbors.iterator();
+ while(neighborsIterator.hasNext()) {
+ next = neighborsIterator.next();
+ sum += next.f2.getValue();
+ }
+ return new Tuple2<Long, Long>(next.f0, sum);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(
+ Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+ Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+ neighbors.iterator();
+ while(neighborsIterator.hasNext()) {
+ next = neighborsIterator.next();
+ sum += next.f2.getValue() * next.f1.getValue();
+ }
+ return new Tuple2<Long, Long>(next.f0, sum);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ public Tuple2<Long, Long> iterateNeighbors(
+ Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ long sum = 0;
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+ Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+ neighbors.iterator();
+ while(neighborsIterator.hasNext()) {
+ next = neighborsIterator.next();
+ sum += next.f2.getValue();
+ }
+ return new Tuple2<Long, Long>(next.f0, sum);
+ }
+ }
+}
\ No newline at end of file