You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:54 UTC
[16/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
new file mode 100644
index 0000000..bb3a131
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+ @Test
+ public void testTranslationPlainEdges() {
+ try {
+ final String ITERATION_NAME = "Test Name";
+
+ final String AGGREGATOR_NAME = "AggregatorName";
+
+ final String BC_SET_MESSAGES_NAME = "borat messages";
+
+ final String BC_SET_UPDATES_NAME = "borat updates";
+ ;
+ final int NUM_ITERATIONS = 13;
+
+ final int ITERATION_parallelism = 77;
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcMessaging = env.fromElements(1L);
+ DataSet<Long> bcUpdate = env.fromElements(1L);
+
+ DataSet<Vertex<String, Double>> result;
+
+ // ------------ construct the test program ------------------
+ {
+
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+ Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+ edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+ public Tuple3<String, String, NullValue> map(
+ Tuple2<String, String> edge) {
+ return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ }), env);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+ parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+ result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+ NUM_ITERATIONS, parameters).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+ }
+
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+ try {
+ final String ITERATION_NAME = "Test Name";
+
+ final String AGGREGATOR_NAME = "AggregatorName";
+
+ final String BC_SET_MESSAGES_NAME = "borat messages";
+
+ final String BC_SET_UPDATES_NAME = "borat updates";
+ ;
+ final int NUM_ITERATIONS = 13;
+
+ final int ITERATION_parallelism = 77;
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Vertex<String, Double>> result;
+
+ // ------------ construct the test program ------------------
+ {
+
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+ Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+ edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+ public Tuple3<String, String, NullValue> map(
+ Tuple2<String, String> edge) {
+ return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ }), env);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+ parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+ result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+ NUM_ITERATIONS, parameters).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+ }
+
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+ @Override
+ public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
+ }
+
+ public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, NullValue> {
+
+ @Override
+ public void sendMessages(Vertex<String, Double> vertex) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
new file mode 100644
index 0000000..3fbd0bc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CollectionModeSuperstepITCase {
+
+ /**
+ * Dummy iteration to test that the supersteps are correctly incremented
+ * and can be retrieved from inside the updated and messaging functions.
+ * All vertices start with value 1 and increase their value by 1
+ * in each iteration.
+ */
+ @Test
+ public void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+ new UpdateFunction(), new MessageFunction(), 10);
+
+ result.getVertices().map(
+ new VertexToTuple2Map<Long, Long>()).output(
+ new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ env.execute();
+ }
+
+ public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long superstep = getSuperstepNumber();
+ Assert.assertEquals(true, vertex.getValue() == superstep);
+ setNewVertexValue(vertex.getValue() + 1);
+ }
+ }
+
+ public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ long superstep = getSuperstepNumber();
+ Assert.assertEquals(true, vertex.getValue() == superstep);
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+ public Long map(Vertex<Long, Long> value) {
+ return 1l;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
new file mode 100644
index 0000000..67d32a8
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase {
+
+ public GatherSumApplyConfigurationITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testRunWithConfiguration() throws Exception {
+ /*
+ * Test Graph's runGatherSumApplyIteration when configuration parameters are provided
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ // create the configuration object
+ GSAConfiguration parameters = new GSAConfiguration();
+
+ parameters.addBroadcastSetForGatherFunction("gatherBcastSet", env.fromElements(1, 2, 3));
+ parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6));
+ parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9));
+ parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+ parameters.setOptNumVertices(true);
+
+ Graph<Long, Long, Long> res = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
+ new Apply(), 10, parameters);
+
+ DataSet<Vertex<Long, Long>> data = res.getVertices();
+ List<Vertex<Long, Long>> result= data.collect();
+
+ expectedResult = "1,11\n" +
+ "2,11\n" +
+ "3,11\n" +
+ "4,11\n" +
+ "5,11";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationConfiguration() throws Exception {
+
+ /*
+ * Test name, parallelism and solutionSetUnmanaged parameters
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
+ .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
+ new DummySum(), new DummyApply(), 10);
+
+ GSAConfiguration parameters = new GSAConfiguration();
+ parameters.setName("gelly iteration");
+ parameters.setParallelism(2);
+ parameters.setSolutionSetUnmanagedMemory(true);
+
+ iteration.configure(parameters);
+
+ Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+ Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+ Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+ DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+ List<Vertex<Long, Long>> result= data.collect();
+
+ expectedResult = "1,11\n" +
+ "2,12\n" +
+ "3,13\n" +
+ "4,14\n" +
+ "5,15";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDefaultDirection() throws Exception {
+
+ /*
+ * Test that if no direction parameter is given, the iteration works as before
+ * (i.e. it gathers information from the IN edges and neighbors and the information is calculated for an OUT edge
+ * Default direction parameter is OUT for the GatherSumApplyIterations)
+ * When data is gathered from the IN edges the Gather Sum and Apply functions
+ * set the set of vertices which have path to a vertex as the value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 2, 3, 4, 5]\n"
+ +"2,[2]\n"
+ +"3,[1, 2, 3, 4, 5]\n"
+ +"4,[1, 2, 3, 4, 5]\n"
+ +"5,[1, 2, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDirectionIN() throws Exception {
+
+ /*
+ * Test that if the direction parameter IN is given, the iteration works as expected
+ * (i.e. it gathers information from the OUT edges and neighbors and the information is calculated for an IN edge
+ * When data is gathered from the OUT edges the Gather Sum and Apply functions
+ * set the set of vertices which have path from a vertex as the value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GSAConfiguration parameters = new GSAConfiguration();
+
+ parameters.setDirection(EdgeDirection.IN);
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+ parameters)
+ .getVertices();
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 3, 4, 5]\n"
+ +"2,[1, 2, 3, 4, 5]\n"
+ +"3,[1, 3, 4, 5]\n"
+ +"4,[1, 3, 4, 5]\n"
+ +"5,[1, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDirectionALL() throws Exception {
+
+ /*
+ * Test that if the direction parameter OUT is given, the iteration works as expected
+ * (i.e. it gathers information from both IN and OUT edges and neighbors
+ * When data is gathered from the ALL edges the Gather Sum and Apply functions
+ * set the set of vertices which are connected to a Vertex through some path as value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GSAConfiguration parameters = new GSAConfiguration();
+ parameters.setDirection(EdgeDirection.ALL);
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+ parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 2, 3, 4, 5]\n"
+ +"2,[1, 2, 3, 4, 5]\n"
+ +"3,[1, 2, 3, 4, 5]\n"
+ +"4,[1, 2, 3, 4, 5]\n"
+ +"5,[1, 2, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Gather extends GatherFunction<Long, Long, Long> {
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0));
+ Assert.assertEquals(2, bcastSet.get(1));
+ Assert.assertEquals(3, bcastSet.get(2));
+
+ // test aggregator
+ if (getSuperstepNumber() == 2) {
+ long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+
+ Assert.assertEquals(7, aggrValue);
+ }
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+ }
+
+ public Long gather(Neighbor<Long, Long> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Sum extends SumFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0));
+ Assert.assertEquals(5, bcastSet.get(1));
+ Assert.assertEquals(6, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+ }
+
+ public Long sum(Long newValue, Long currentValue) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+ return 0l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Apply extends ApplyFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet");
+ Assert.assertEquals(7, bcastSet.get(0));
+ Assert.assertEquals(8, bcastSet.get(1));
+ Assert.assertEquals(9, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+ }
+
+ public void apply(Long summedValue, Long origValue) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+ setResult(origValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummyGather extends GatherFunction<Long, Long, Long> {
+
+ @Override
+ public void preSuperstep() {
+ // test number of vertices
+ // when the numVertices option is not set, -1 is returned
+ Assert.assertEquals(-1, getNumberOfVertices());
+ }
+
+ public Long gather(Neighbor<Long, Long> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummySum extends SumFunction<Long, Long, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return 0l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummyApply extends ApplyFunction<Long, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ setResult(origValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+ public Long map(Vertex<Long, Long> value) {
+ return 1l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+ HashSet<Long> h = new HashSet<Long>();
+ h.add(value.getId());
+ return h;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
+ @Override
+ public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
+ HashSet<Long> set = currentSet;
+ for(Long l : newSet) {
+ set.add(l);
+ }
+ return set;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
+
+ @Override
+ public void apply(HashSet<Long> newValue, HashSet<Long> currentValue) {
+ newValue.addAll(currentValue);
+ if(newValue.size()>currentValue.size()) {
+ setResult(newValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..0213f02
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+ public GatherSumApplyITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components Test
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testConnectedComponents() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+ new InitMapperCC(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(
+ new GSAConnectedComponents<Long, NullValue>(16)).collect();
+
+ expectedResult = "1,1\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path Test
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testSingleSourceShortestPaths() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+ new InitMapperSSSP(), env);
+
+ List<Vertex<Long, Double>> result = inputGraph.run(
+ new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
+
+ expectedResult = "1,0.0\n" +
+ "2,12.0\n" +
+ "3,13.0\n" +
+ "4,47.0\n" +
+ "5,48.0\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitMapperCC implements MapFunction<Long, Long> {
+ public Long map(Long value) {
+ return value;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 0.0;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
new file mode 100644
index 0000000..294926f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+public class TestGraphUtils {
+
+ public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
+ ExecutionEnvironment env) {
+
+ return env.fromCollection(getLongLongVertices());
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
+ ExecutionEnvironment env) {
+
+ return env.fromCollection(getLongLongEdges());
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+
+ edges.remove(1);
+ edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+
+ edges.remove(0);
+ edges.add(new Edge<Long, Long>(3L, 13L, 13L));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = getLongLongEdges();
+ edges.remove(0);
+ edges.remove(1);
+ edges.remove(2);
+ edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+ edges.add(new Edge<Long, Long>(1L, 12L, 12L));
+ edges.add(new Edge<Long, Long>(13L, 33L, 13L));
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
+ ExecutionEnvironment env) {
+ List<Edge<String, Long>> edges = new ArrayList<Edge<String, Long>>();
+ edges.add(new Edge<String, Long>("1", "2", 12L));
+ edges.add(new Edge<String, Long>("1", "3", 13L));
+ edges.add(new Edge<String, Long>("2", "3", 23L));
+ edges.add(new Edge<String, Long>("3", "4", 34L));
+ edges.add(new Edge<String, Long>("3", "5", 35L));
+ edges.add(new Edge<String, Long>("4", "5", 45L));
+ edges.add(new Edge<String, Long>("5", "1", 51L));
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+ tuples.add(new Tuple2<Long, Long>(1L, 10L));
+ tuples.add(new Tuple2<Long, Long>(2L, 20L));
+ tuples.add(new Tuple2<Long, Long>(3L, 30L));
+ tuples.add(new Tuple2<Long, Long>(4L, 40L));
+ tuples.add(new Tuple2<Long, Long>(6L, 60L));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+ tuples.add(new Tuple2<Long, Long>(1L, 10L));
+ tuples.add(new Tuple2<Long, Long>(1L, 20L));
+ tuples.add(new Tuple2<Long, Long>(2L, 30L));
+ tuples.add(new Tuple2<Long, Long>(3L, 40L));
+ tuples.add(new Tuple2<Long, Long>(3L, 50L));
+ tuples.add(new Tuple2<Long, Long>(4L, 60L));
+ tuples.add(new Tuple2<Long, Long>(6L, 70L));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+ tuples.add(new Tuple2<Long, Long>(2L, 10L));
+ tuples.add(new Tuple2<Long, Long>(3L, 20L));
+ tuples.add(new Tuple2<Long, Long>(3L, 30L));
+ tuples.add(new Tuple2<Long, Long>(4L, 40L));
+ tuples.add(new Tuple2<Long, Long>(6L, 50L));
+ tuples.add(new Tuple2<Long, Long>(6L, 60L));
+ tuples.add(new Tuple2<Long, Long>(1L, 70L));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
+ ExecutionEnvironment env) {
+ List<Tuple3<Long, Long, Long>> tuples = new ArrayList<Tuple3<Long, Long, Long>>();
+ tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
+ tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
+ tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
+ tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
+ tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
+ tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
+ tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+ DummyCustomParameterizedType<Float>>>();
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+ new DummyCustomParameterizedType<Float>(10, 10f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+ new DummyCustomParameterizedType<Float>(20, 20f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+ new DummyCustomParameterizedType<Float>(30, 30f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
+ new DummyCustomParameterizedType<Float>(40, 40f)));
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+ DummyCustomParameterizedType<Float>>>();
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+ new DummyCustomParameterizedType<Float>(10, 10f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+ new DummyCustomParameterizedType<Float>(20, 20f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+ new DummyCustomParameterizedType<Float>(30, 30f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+ new DummyCustomParameterizedType<Float>(40, 40f)));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
+ ExecutionEnvironment env) {
+ List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+ DummyCustomParameterizedType<Float>>>();
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+ new DummyCustomParameterizedType<Float>(10, 10f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+ new DummyCustomParameterizedType<Float>(20, 20f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+ new DummyCustomParameterizedType<Float>(30, 30f)));
+ tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
+ new DummyCustomParameterizedType<Float>(40, 40f)));
+
+ return env.fromCollection(tuples);
+ }
+
+ public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
+ ExecutionEnvironment env) {
+ List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples =
+ new ArrayList<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>();
+ tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L,
+ new DummyCustomParameterizedType<Float>(10, 10f)));
+ tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L,
+ new DummyCustomParameterizedType<Float>(20, 20f)));
+ tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L, 3L,
+ new DummyCustomParameterizedType<Float>(30, 30f)));
+ tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L, 4L,
+ new DummyCustomParameterizedType<Float>(40, 40f)));
+
+ return env.fromCollection(tuples);
+ }
+
+ /**
+ * A graph with invalid vertex ids
+ */
+ public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
+ ExecutionEnvironment env) {
+ List<Vertex<Long, Long>> vertices = getLongLongVertices();
+
+ vertices.remove(0);
+ vertices.add(new Vertex<Long, Long>(15L, 1L));
+
+ return env.fromCollection(vertices);
+ }
+
+ /**
+ * A graph that has at least one vertex with no ingoing/outgoing edges
+ */
+ public static final DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+ edges.add(new Edge<Long, Long>(1L, 4L, 14L));
+ edges.add(new Edge<Long, Long>(1L, 5L, 15L));
+ edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+ edges.add(new Edge<Long, Long>(3L, 5L, 35L));
+ edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+
+ return env.fromCollection(edges);
+ }
+
+ /**
+ * Function that produces an ArrayList of vertices
+ */
+ public static final List<Vertex<Long, Long>> getLongLongVertices() {
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ vertices.add(new Vertex<Long, Long>(1L, 1L));
+ vertices.add(new Vertex<Long, Long>(2L, 2L));
+ vertices.add(new Vertex<Long, Long>(3L, 3L));
+ vertices.add(new Vertex<Long, Long>(4L, 4L));
+ vertices.add(new Vertex<Long, Long>(5L, 5L));
+
+ return vertices;
+ }
+
+ public static final List<Vertex<Long, Boolean>> getLongBooleanVertices() {
+ List<Vertex<Long, Boolean>> vertices = new ArrayList<Vertex<Long, Boolean>>();
+ vertices.add(new Vertex<Long, Boolean>(1L, true));
+ vertices.add(new Vertex<Long, Boolean>(2L, true));
+ vertices.add(new Vertex<Long, Boolean>(3L, true));
+ vertices.add(new Vertex<Long, Boolean>(4L, true));
+ vertices.add(new Vertex<Long, Boolean>(5L, true));
+
+ return vertices;
+ }
+
+ public static final DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(
+ ExecutionEnvironment env) {
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+ edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+ edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+ edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+
+ return env.fromCollection(edges);
+ }
+
+ /**
+ * Function that produces an ArrayList of edges
+ */
+ public static final List<Edge<Long, Long>> getLongLongEdges() {
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+ edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+ edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+ edges.add(new Edge<Long, Long>(3L, 4L, 34L));
+ edges.add(new Edge<Long, Long>(3L, 5L, 35L));
+ edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+ edges.add(new Edge<Long, Long>(5L, 1L, 51L));
+
+ return edges;
+ }
+
+ public static class DummyCustomType implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int intField;
+ private boolean booleanField;
+
+ public DummyCustomType(int intF, boolean boolF) {
+ this.intField = intF;
+ this.booleanField = boolF;
+ }
+
+ public DummyCustomType() {
+ this.intField = 0;
+ this.booleanField = true;
+ }
+
+ public int getIntField() {
+ return intField;
+ }
+
+ public void setIntField(int intF) {
+ this.intField = intF;
+ }
+
+ public boolean getBooleanField() {
+ return booleanField;
+ }
+
+ @Override
+ public String toString() {
+ return booleanField ? "(T," + intField + ")" : "(F," + intField + ")";
+ }
+ }
+
+ public static class DummyCustomParameterizedType<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int intField;
+ private T tField;
+
+ public DummyCustomParameterizedType(int intF, T tF) {
+ this.intField = intF;
+ this.tField = tF;
+ }
+
+ public DummyCustomParameterizedType() {
+ this.intField = 0;
+ this.tField = null;
+ }
+
+ public int getIntField() {
+ return intField;
+ }
+
+ public void setIntField(int intF) {
+ this.intField = intF;
+ }
+
+ public void setTField(T tF) {
+ this.tField = tF;
+ }
+
+ public T getTField() {
+ return tField;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + tField.toString() + "," + intField + ")";
+ }
+ }
+
+ /**
+ * Method useful for suppressing sysout printing
+ */
+ public static void pipeSystemOutToNull() {
+ System.setOut(new PrintStream(new BlackholeOutputSteam()));
+ }
+
+ private static final class BlackholeOutputSteam extends java.io.OutputStream {
+ @Override
+ public void write(int b){}
+ }
+
+ /**
+ * utils for getting the second graph for the test of method difference();
+ * @param env
+ */
+ public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(
+ ExecutionEnvironment env){
+ return env.fromCollection(getLongLongEdgesForDifference());
+ }
+
+ public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(
+ ExecutionEnvironment env){
+ return env.fromCollection(getLongLongEdgesForDifference2());
+ }
+
+ public static final DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(
+ ExecutionEnvironment env)
+ {
+ return env.fromCollection(getVerticesForDifference());
+ }
+
+ public static final List<Vertex<Long,Long>> getVerticesForDifference(){
+ List<Vertex<Long,Long>> vertices = new ArrayList<Vertex<Long,Long>>();
+ vertices.add(new Vertex<Long, Long>(1L, 1L));
+ vertices.add(new Vertex<Long, Long>(3L, 3L));
+ vertices.add(new Vertex<Long, Long>(6L, 6L));
+
+ return vertices;
+
+ }
+
+ public static final List<Edge<Long, Long>> getLongLongEdgesForDifference() {
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+ edges.add(new Edge<Long, Long>(1L, 6L, 26L));
+ edges.add(new Edge<Long, Long>(6L, 3L, 63L));
+ return edges;
+ }
+
+ public static final List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
+ List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+ edges.add(new Edge<Long, Long>(6L, 6L, 66L));
+ return edges;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
new file mode 100644
index 0000000..0feb3fb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+
+@RunWith(Parameterized.class)
+public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
+
+ public VertexCentricConfigurationITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testRunWithConfiguration() throws Exception {
+ /*
+ * Test Graph's runVertexCentricIteration when configuration parameters are provided
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ // create the configuration object
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
+ parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
+ parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+ parameters.setOptNumVertices(true);
+
+ Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
+ new UpdateFunction(), new MessageFunction(), 10, parameters);
+
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
+
+ expectedResult = "1,11\n" +
+ "2,11\n" +
+ "3,11\n" +
+ "4,11\n" +
+ "5,11";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationConfiguration() throws Exception {
+
+ /*
+ * Test name, parallelism and solutionSetUnmanaged parameters
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ VertexCentricIteration<Long, Long, Long, Long> iteration = VertexCentricIteration
+ .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(),
+ new DummyMessageFunction(), 10);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ parameters.setName("gelly iteration");
+ parameters.setParallelism(2);
+ parameters.setSolutionSetUnmanagedMemory(true);
+
+ iteration.configure(parameters);
+
+ Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+ Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+ Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+ DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+ List<Vertex<Long,Long>> result= data.collect();
+
+ expectedResult = "1,11\n" +
+ "2,12\n" +
+ "3,13\n" +
+ "4,14\n" +
+ "5,15";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testDefaultConfiguration() throws Exception {
+ /*
+ * Test Graph's runVertexCentricIteration when configuration parameters are not provided
+ * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
+ new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
+
+
+ DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
+ List<Tuple2<Long, Long>> result= data.collect();
+
+ expectedResult = "1,6\n" +
+ "2,6\n" +
+ "3,6\n" +
+ "4,6\n" +
+ "5,6";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDefaultDirection() throws Exception {
+
+ /*
+ * Test that if no direction parameter is given, the iteration works as before
+ * (i.e. it collects messages from the in-neighbors and sends them to the out-neighbors)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+ expectedResult = "1,[5]\n" +
+ "2,[1]\n" +
+ "3,[1, 2]\n" +
+ "4,[3]\n" +
+ "5,[3, 4]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationINDirection() throws Exception {
+
+ /*
+ * Test that if the direction parameter is set to IN,
+ * messages are collected from the out-neighbors and sent to the in-neighbors.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setDirection(EdgeDirection.IN);
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+ expectedResult = "1,[2, 3]\n" +
+ "2,[3]\n" +
+ "3,[4, 5]\n" +
+ "4,[5]\n" +
+ "5,[1]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationALLDirection() throws Exception {
+
+ /*
+ * Test that if the direction parameter is set to ALL,
+ * messages are collected from all the neighbors and sent to all the neighbors.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setDirection(EdgeDirection.ALL);
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+ expectedResult = "1,[2, 3, 5]\n" +
+ "2,[1, 3]\n" +
+ "3,[1, 2, 4, 5]\n" +
+ "4,[3, 5]\n" +
+ "5,[1, 3, 4]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testNumVerticesNotSet() throws Exception {
+
+ /*
+ * Test that if the number of vertices option is not set, -1 is returned as value.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
+ new DummyMessageFunction(), 2).getVertices();
+
+ List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
+
+ expectedResult = "1,-1\n" +
+ "2,-1\n" +
+ "3,-1\n" +
+ "4,-1\n" +
+ "5,-1";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testInDegreesSet() throws Exception {
+
+ /*
+ * Test that if the degrees are set, they can be accessed in every superstep
+ * inside the update function and the value
+ * is correctly computed for degrees in the messaging function.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setOptDegrees(true);
+
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+
+ List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+ expectedResult = "1,1\n" +
+ "2,1\n" +
+ "3,2\n" +
+ "4,1\n" +
+ "5,2";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testInDegreesNotSet() throws Exception {
+
+ /*
+ * Test that if the degrees option is not set, then -1 is returned as a value for in-degree.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+
+ List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+ expectedResult = "1,-1\n" +
+ "2,-1\n" +
+ "3,-1\n" +
+ "4,-1\n" +
+ "5,-1";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testOutDegreesSet() throws Exception {
+
+ /*
+ * Test that if the degrees are set, they can be accessed in every superstep
+ * inside the update function and the value
+ * is correctly computed for degrees in the messaging function.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setOptDegrees(true);
+
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+
+ List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+ expectedResult = "1,2\n" +
+ "2,1\n" +
+ "3,2\n" +
+ "4,1\n" +
+ "5,1";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testOutDegreesNotSet() throws Exception {
+
+ /*
+ * Test that if the degrees option is not set, then -1 is returned as a value for out-degree.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+
+ List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+ expectedResult = "1,-1\n" +
+ "2,-1\n" +
+ "3,-1\n" +
+ "4,-1\n" +
+ "5,-1";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testDirectionALLAndDegrees() throws Exception {
+
+ /*
+ * Compute the number of neighbors in a vertex - centric manner, and verify that it is equal to
+ * the sum: inDegree + outDegree.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Boolean, Long> graph = Graph.fromCollection(TestGraphUtils.getLongBooleanVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setOptDegrees(true);
+ parameters.setDirection(EdgeDirection.ALL);
+
+ DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
+ new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
+
+ List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
+
+ expectedResult = "1,true\n" +
+ "2,true\n" +
+ "3,true\n" +
+ "4,true\n" +
+ "5,true";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0));
+ Assert.assertEquals(2, bcastSet.get(1));
+ Assert.assertEquals(3, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+
+ }
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+
+ setNewVertexValue(vertex.getValue() + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+
+ // test number of vertices
+ Assert.assertEquals(-1, getNumberOfVertices());
+
+ // test degrees
+ Assert.assertEquals(-1, getInDegree());
+ Assert.assertEquals(-1, getOutDegree());
+
+ setNewVertexValue(vertex.getValue() + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0));
+ Assert.assertEquals(5, bcastSet.get(1));
+ Assert.assertEquals(6, bcastSet.get(2));
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+
+ // test aggregator
+ if (getSuperstepNumber() == 2) {
+ long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+ Assert.assertEquals(5, aggrValue);
+ }
+ }
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ // test number of vertices
+ Assert.assertEquals(-1, getNumberOfVertices());
+
+ // test degrees
+ Assert.assertEquals(-1, getInDegree());
+ Assert.assertEquals(-1, getOutDegree());
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ setNewVertexValue(getNumberOfVertices());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ setNewVertexValue(vertex.getValue() + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ if (vertex.getId().equals(1)) {
+ Assert.assertEquals(2, getOutDegree());
+ Assert.assertEquals(1, getInDegree());
+ }
+ else if(vertex.getId().equals(3)) {
+ Assert.assertEquals(2, getOutDegree());
+ Assert.assertEquals(2, getInDegree());
+ }
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
+ vertex.getValue().clear();
+
+ for(long msg : messages) {
+ vertex.getValue().add(msg);
+ }
+
+ setNewVertexValue(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long inDegree = getInDegree();
+ setNewVertexValue(inDegree);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long outDegree = getOutDegree();
+ setNewVertexValue(outDegree);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
+ Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Boolean> vertex, MessageIterator<Long> messages) throws Exception {
+
+ long count = 0;
+
+ for(@SuppressWarnings("unused") long msg : messages) {
+ count++;
+ }
+ setNewVertexValue(count == (getInDegree() + getOutDegree()));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long inDegree = getInDegree();
+ long outDegree = getOutDegree();
+ setNewVertexValue(inDegree + outDegree);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class IdMessengerSrc extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+ for (Edge<Long, Long> edge : getEdges()) {
+ sendMessageTo(edge.getSource(), vertex.getId());
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class IdMessengerAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+ for (Edge<Long, Long> edge : getEdges()) {
+ if(edge.getSource() != vertex.getId()) {
+ sendMessageTo(edge.getSource(), vertex.getId());
+ } else {
+ sendMessageTo(edge.getTarget(), vertex.getId());
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
+ for (Edge<Long, Long> edge : getEdges()) {
+ if(edge.getSource() != vertex.getId()) {
+ sendMessageTo(edge.getSource(), vertex.getId());
+ } else {
+ sendMessageTo(edge.getTarget(), vertex.getId());
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class IdMessengerTrg extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+ for (Edge<Long, Long> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getId());
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+ public Long map(Vertex<Long, Long> value) {
+ return 1l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+ return new HashSet<Long>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..b0bacc4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.ConnectedComponents;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public ConnectedComponentsITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testConnectedComponentsExample() throws Exception {
+ ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
+ expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
new file mode 100644
index 0000000..183c429
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.EuclideanGraphWeighing;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
+
+ private String verticesPath;
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testGraphWeightingWeighing() throws Exception {
+ EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath});
+ expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..c19411b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.IncrementalSSSP;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+ private String verticesPath;
+
+ private String edgesPath;
+
+ private String edgesInSSSPPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public IncrementalSSSPITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
+
+ File edgesInSSSPFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+ }
+
+ @Test
+ public void testIncrementalSSSP() throws Exception {
+ IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
+ IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
+ IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+ expected = IncrementalSSSPData.RESULTED_VERTICES;
+ }
+
+ @Test
+ public void testIncrementalSSSPNonSPEdge() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
+ DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
+ DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+ // the edge to be removed is a non-SP edge
+ Edge<Long, Double> edgeToBeRemoved = new Edge<Long, Double>(3L, 5L, 5.0);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+ // Assumption: all minimum weight paths are kept
+ Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+ // remove the edge
+ graph.removeEdge(edgeToBeRemoved);
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+ parameters.setDirection(EdgeDirection.IN);
+ parameters.setOptDegrees(true);
+
+ // run the vertex centric iteration to propagate info
+ Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(
+ new IncrementalSSSP.VertexDistanceUpdater(),
+ new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+ IncrementalSSSPData.NUM_VERTICES, parameters);
+
+ DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+ resultedVertices.writeAsCsv(resultPath, "\n", ",");
+ env.execute();
+ } else {
+ vertices.writeAsCsv(resultPath, "\n", ",");
+ env.execute();
+ }
+
+ expected = IncrementalSSSPData.VERTICES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
new file mode 100644
index 0000000..294a756
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasure;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testJaccardSimilarityMeasureExample() throws Exception {
+ JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath});
+ expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}