You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:51 UTC

[13/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
new file mode 100644
index 0000000..7553b32
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunction;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
+
+	public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+	@Test
+	public void testSumOfOutNeighbors() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+		
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfInNeighbors() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
+		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+		
+		expectedResult = "1,255\n" +
+				"2,12\n" + 
+				"3,59\n" +
+				"4,102\n" + 
+				"5,285\n";
+		
+		compareResultAsTuples(result, expectedResult);
+		
+		
+	}
+
+	@Test
+	public void testSumOfOAllNeighbors() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "1,11\n" +
+				"2,6\n" + 
+				"3,15\n" +
+				"4,12\n" + 
+				"5,13\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+		
+		expectedResult = "4,5\n" +
+				"5,1\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfInNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum =
+				graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
+		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+		
+		expectedResult = "4,102\n" +
+				"5,285\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "4,12\n" +
+				"5,13\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOutNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfInNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+	
+		expectedResult = "1,255\n" +
+				"2,12\n" +
+				"3,59\n" +
+				"4,102\n" +
+				"5,285\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfAllNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+				graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
+	
+		expectedResult = "1,10\n" +
+				"2,4\n" + 
+				"3,12\n" +
+				"4,8\n" + 
+				"5,8\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "3,9\n" +
+				"3,18\n" +
+				"4,5\n" +
+				"4,10\n" +
+				"5,1\n" +
+				"5,2";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "3,59\n" +
+				"3,118\n" +
+				"4,204\n" +
+				"4,102\n" +
+				"5,570\n" +
+				"5,285";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
+
+		expectedResult = "3,12\n" +
+				"3,24\n" +
+				"4,8\n" +
+				"4,16\n" +
+				"5,8\n" +
+				"5,16";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOutNeighborsMultipliedByTwo() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex as well as the sum of out-neighbor values multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+		
+		expectedResult = "1,5\n" +
+				"1,10\n" +
+				"2,3\n" +
+				"2,6\n" +
+				"3,9\n" +
+				"3,18\n" +
+				"4,5\n" +
+				"4,10\n" +
+				"5,1\n" +
+				"5,2";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfInNeighborsSubtractOne() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex as well as the same sum minus one.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum =
+				graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
+		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+		
+		expectedResult = "1,255\n" +
+				"1,254\n" +
+				"2,12\n" +
+				"2,11\n" +
+				"3,59\n" +
+				"3,58\n" +
+				"4,102\n" +
+				"4,101\n" +
+				"5,285\n" +
+				"5,284";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSumOfOAllNeighborsAddFive() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex as well as the same sum plus five.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
+		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
+		expectedResult = "1,11\n" +
+				"1,16\n" +
+				"2,6\n" +
+				"2,11\n" +
+				"3,15\n" +
+				"3,20\n" +
+				"4,12\n" +
+				"4,17\n" +
+				"5,13\n" +
+				"5,18";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+	Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
+			
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
+		
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+	
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+					sum += neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
+
+		@Override
+		public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
+			return firstNeighbor + secondNeighbor;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue() * next.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(next.f0, sum));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
+			}
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue() * next.f1.getValue();
+			}
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
+			}
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
new file mode 100644
index 0000000..b32abeb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class ReduceOnNeighborsWithExceptionITCase {
+
+	private static final int PARALLELISM = 4;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+
+	@BeforeClass
+	public static void setupCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Error starting test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+	/**
+	 * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
+	 * with an edge having a srcId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
+
+		try {
+			DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+					graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+
+			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
+	 * with an edge having a trgId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
+
+		try {
+			DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+					graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+
+			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test groupReduceOnNeighbors() -NeighborsFunction-
+	 * with an edge having a srcId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
+
+		try {
+			DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+					graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
+
+			verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test groupReduceOnNeighbors() -NeighborsFunction-
+	 * with an edge having a trgId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
+
+		try {
+			DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+					graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
+
+			verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
+
+		@Override
+		public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
+			return firstNeighbor + secondNeighbor;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
new file mode 100644
index 0000000..b38ffeb
--- /dev/null
+++ b/flink-libraries/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-libraries</artifactId>
+	<name>flink-libraries</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-gelly</module>
+		<module>flink-gelly-scala</module>
+	</modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml
deleted file mode 100644
index edcb865..0000000
--- a/flink-staging/flink-gelly-scala/pom.xml
+++ /dev/null
@@ -1,204 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-staging</artifactId>
-        <version>0.10-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>flink-gelly-scala</artifactId>
-
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-gelly</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <!-- Scala Compiler -->
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <version>3.1.4</version>
-                <executions>
-                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
-                        scala classes can be resolved later in the (Java) compile phase -->
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-
-                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-                         scala classes can be resolved later in the (Java) test-compile phase -->
-                    <execution>
-                        <id>scala-test-compile</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <jvmArgs>
-                        <jvmArg>-Xms128m</jvmArg>
-                        <jvmArg>-Xmx512m</jvmArg>
-                    </jvmArgs>
-                    <compilerPlugins combine.children="append">
-                        <compilerPlugin>
-                            <groupId>org.scalamacros</groupId>
-                            <artifactId>paradise_${scala.version}</artifactId>
-                            <version>${scala.macros.version}</version>
-                        </compilerPlugin>
-                    </compilerPlugins>
-                </configuration>
-            </plugin>
-
-            <!-- Eclipse Integration -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-eclipse-plugin</artifactId>
-                <version>2.8</version>
-                <configuration>
-                    <downloadSources>true</downloadSources>
-                    <projectnatures>
-                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                    </projectnatures>
-                    <buildcommands>
-                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                    </buildcommands>
-                    <classpathContainers>
-                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                    </classpathContainers>
-                    <excludes>
-                        <exclude>org.scala-lang:scala-library</exclude>
-                        <exclude>org.scala-lang:scala-compiler</exclude>
-                    </excludes>
-                    <sourceIncludes>
-                        <sourceInclude>**/*.scala</sourceInclude>
-                        <sourceInclude>**/*.java</sourceInclude>
-                    </sourceIncludes>
-                </configuration>
-            </plugin>
-
-            <!-- Adding scala source directories to build path -->
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.7</version>
-                <executions>
-                    <!-- Add src/main/scala to eclipse build path -->
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/main/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <!-- Add src/test/scala to eclipse build path -->
-                    <execution>
-                        <id>add-test-source</id>
-                        <phase>generate-test-sources</phase>
-                        <goals>
-                            <goal>add-test-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/test/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.scalastyle</groupId>
-                <artifactId>scalastyle-maven-plugin</artifactId>
-                <version>0.5.0</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>check</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <verbose>false</verbose>
-                    <failOnViolation>true</failOnViolation>
-                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
-                    <failOnWarning>false</failOnWarning>
-                    <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-                    <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-                    <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                    <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-                    <outputEncoding>UTF-8</outputEncoding>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
deleted file mode 100644
index 70a5fdf..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
-
-  def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T])
-
-  override def iterateEdges(edges: java.lang.Iterable[Tuple2[K, Edge[K, EV]]], out:
-  Collector[T]): Unit = {
-    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(edges)
-      .map(jtuple => (jtuple.f0, jtuple.f1))
-    iterateEdges(scalaIterable, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
deleted file mode 100644
index 82589b6..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.Collector
-
-abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
-.EdgesFunctionWithVertexValue[K, VV, EV, T] {
-  @throws(classOf[Exception])
-  def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T])
-
-  override def iterateEdges(v: Vertex[K, VV], edges: java.lang.Iterable[Edge[K, EV]], out:
-  Collector[T]) = {
-    iterateEdges(v, scala.collection.JavaConversions.iterableAsScalaIterable(edges), out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
deleted file mode 100644
index 28f3f12..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ /dev/null
@@ -1,1014 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{tuple => jtuple}
-import org.apache.flink.api.scala._
-import org.apache.flink.graph._
-import org.apache.flink.graph.validation.GraphValidator
-import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
-import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
-import org.apache.flink.{graph => jg}
-import _root_.scala.collection.JavaConverters._
-import _root_.scala.reflect.ClassTag
-import org.apache.flink.types.NullValue
-
-object Graph {
-
-  /**
-  * Creates a Graph from a DataSet of vertices and a DataSet of edges.
-  */
-  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
-                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
-  def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
-  (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
-    wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
-  */
-  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a Seq of vertices and a Seq of edges.
-  */
-  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
-  ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
-      .asJavaCollection, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a Seq of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
-  def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
-  (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
-    wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a graph from a Seq of edges.
-  * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
-  */
-  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a DataSets of Tuples.
-  */
-  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
-                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
-    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
-        env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
-  def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
-  (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
-    wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
-  */
-  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
-    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
-  }
-
-  /**
-  * Creates a Graph with from a CSV file of vertices and a CSV file of edges
-  * 
-  * @param The Execution Environment.
-  * @param pathEdges The file path containing the edges.
-  * @param readVertices Defines whether the vertices have associated values.
-  * If set to false, the vertex input is ignored and vertices are created from the edges file.
-  * @param pathVertices The file path containing the vertices.
-  * @param hasEdgeValues Defines whether the edges have associated values. True by default.
-  * @param lineDelimiterVertices The string that separates lines in the vertices file.
-  * It defaults to newline.
-  * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values
-  * in the vertices file.
-  * @param quoteCharacterVertices The character to use for quoted String parsing
-  * in the vertices file. Disabled by default.
-  * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
-  * @param ignoreCommentsVertices Lines that start with the given String in the vertices file
-  * are ignored, disabled by default.
-  * @param lenientVertices Whether the parser should silently ignore malformed lines in the
-  * vertices file.
-  * @param includedFieldsVertices The fields in the vertices file that should be read.
-  * By default all fields are read.
-  * @param lineDelimiterEdges The string that separates lines in the edges file.
-  * It defaults to newline.
-  * @param fieldDelimiterEdges The string that separates fields in the edges file.
-  * @param quoteCharacterEdges The character to use for quoted String parsing
-  * in the edges file. Disabled by default.
-  * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
-  * @param ignoreCommentsEdges Lines that start with the given String in the edges file
-  * are ignored, disabled by default.
-  * @param lenientEdges Whether the parser should silently ignore malformed lines in the
-  * edges file.
-  * @param includedFieldsEdges The fields in the edges file that should be read.
-  * By default all fields are read.
-  * @param mapper If no vertex values are provided, this mapper can be used to initialize them.
-  * 
-  */
-  // scalastyle:off
-  // This method exceeds the max allowed number of parameters -->  
-  def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
-    EV: TypeInformation : ClassTag](
-      env: ExecutionEnvironment,
-      pathEdges: String,
-      readVertices: Boolean,
-      pathVertices: String = null,
-      hasEdgeValues: Boolean = true,
-      lineDelimiterVertices: String = "\n",
-      fieldDelimiterVertices: String = ",",
-      quoteCharacterVertices: Character = null,
-      ignoreFirstLineVertices: Boolean = false,
-      ignoreCommentsVertices: String = null,
-      lenientVertices: Boolean = false,
-      includedFieldsVertices: Array[Int] = null,
-      lineDelimiterEdges: String = "\n",
-      fieldDelimiterEdges: String = ",",
-      quoteCharacterEdges: Character = null,
-      ignoreFirstLineEdges: Boolean = false,
-      ignoreCommentsEdges: String = null,
-      lenientEdges: Boolean = false,
-      includedFieldsEdges: Array[Int] = null,
-      mapper: MapFunction[K, VV] = null) = {
-
-    // with vertex and edge values
-    if (readVertices && hasEdgeValues) {
-      if (pathVertices.equals(null)) {
-        throw new IllegalArgumentException(
-            "The vertices file path must be specified when readVertices is true.")
-      } else {
-        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
-            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
-            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
-
-        val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
-            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
-            includedFieldsEdges)
-     
-        fromTupleDataSet[K, VV, EV](vertices, edges, env) 
-      }
-    }
-    // with vertex value and no edge value
-    else if (readVertices && (!hasEdgeValues)) {
-       if (pathVertices.equals(null)) {
-        throw new IllegalArgumentException(
-            "The vertices file path must be specified when readVertices is true.")
-      } else {
-        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
-            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
-            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
-
-        val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
-            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
-            includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
-
-        fromTupleDataSet[K, VV, NullValue](vertices, edges, env)
-      }
-    }
-    // with edge value and no vertex value
-    else if ((!readVertices) && hasEdgeValues) {
-      val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
-        quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
-        includedFieldsEdges)
-
-      // initializer provided
-      if (mapper != null) {
-        fromTupleDataSet[K, VV, EV](edges, mapper, env)
-      }
-      else {
-        fromTupleDataSet[K, EV](edges, env) 
-      }
-    }
-    // with no edge value and no vertex value
-    else {
-      val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
-      quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges,
-      lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
-
-      // no initializer provided
-      if (mapper != null) {
-        fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
-      }
-      else {
-        fromTupleDataSet[K, NullValue](edges, env) 
-      }
-    }
-  }
-// scalastyle:on
-
-}
-
-/**
- * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
- * @param jgraph the underlying java api Graph.
- * @tparam K the key type for vertex and edge identifiers
- * @tparam VV the value type for vertices
- * @tparam EV the value type for edges
- * @see org.apache.flink.graph.Edge
- * @see org.apache.flink.graph.Vertex
- */
-final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
-
-  private[flink] def getWrappedGraph = jgraph
-
-
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
-    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
-      ClosureCleaner.clean(f, checkSerializable)
-    }
-    ClosureCleaner.ensureSerializable(f)
-    f
-  }
-
-  /**
-   * @return the vertex DataSet.
-   */
-  def getVertices = wrap(jgraph.getVertices)
-
-  /**
-   * @return the edge DataSet.
-   */
-  def getEdges = wrap(jgraph.getEdges)
-
-  /**
-   * @return the vertex DataSet as Tuple2.
-   */
-  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
-    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
-  }
-
-  /**
-   * @return the edge DataSet as Tuple3.
-   */
-  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
-    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
-  }
-
-  /**
-  * @return a DataSet of Triplets,
-  * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
-  */
-  def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
-    wrap(jgraph.getTriplets())
-  }
-
-  /**
-   * Apply a function to the attribute of each vertex in the graph.
-   *
-   * @param mapper the map function to apply.
-   * @return a new graph
-   */
-  def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]):
-  Graph[K, NV, EV] = {
-    new Graph[K, NV, EV](jgraph.mapVertices[NV](
-      mapper,
-      createTypeInformation[Vertex[K, NV]]
-    ))
-  }
-
-  /**
-   * Apply a function to the attribute of each vertex in the graph.
-   *
-   * @param fun the map function to apply.
-   * @return a new graph
-   */
-  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV] = {
-    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, VV], NV] {
-      val cleanFun = clean(fun)
-
-      def map(in: Vertex[K, VV]): NV = cleanFun(in)
-    }
-    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, createTypeInformation[Vertex[K, NV]]))
-  }
-
-  /**
-   * Apply a function to the attribute of each edge in the graph.
-   *
-   * @param mapper the map function to apply.
-   * @return a new graph
-   */
-  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K,
-    VV, NV] = {
-    new Graph[K, VV, NV](jgraph.mapEdges[NV](
-      mapper,
-      createTypeInformation[Edge[K, NV]]
-    ))
-  }
-
-  /**
-   * Apply a function to the attribute of each edge in the graph.
-   *
-   * @param fun the map function to apply.
-   * @return a new graph
-   */
-  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV] = {
-    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, EV], NV] {
-      val cleanFun = clean(fun)
-
-      def map(in: Edge[K, EV]): NV = cleanFun(in)
-    }
-    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]]))
-  }
-
-  /**
-   * Joins the vertex DataSet of this graph with an input DataSet and applies
-   * a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @return a new graph where the vertex values have been updated.
-   */
-  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
-    (VV, T), VV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
-      override def map(value: jtuple.Tuple2[VV, T]): VV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the vertex DataSet of this graph with an input DataSet and applies
-   * a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @return a new graph where the vertex values have been updated.
-   */
-  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
-  Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
-      val cleanFun = clean(fun)
-
-      override def map(value: jtuple.Tuple2[VV, T]): VV = {
-        cleanFun(value.f0, value.f1)
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on a composite key of both
-   * source and target and applies a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
-    (EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
-      scalatuple._2, scalatuple._3)).javaSet
-    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on a composite key of both
-   * source and target and applies a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
-  Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      val cleanFun = clean(fun)
-
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
-      scalatuple._2, scalatuple._3)).javaSet
-    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on the source key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. In case the inputDataSet contains the same key more
-   * than once, only the first value will be considered.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
-  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on the source key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. In case the inputDataSet contains the same key more
-   * than once, only the first value will be considered.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
-    EV): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      val cleanFun = clean(fun)
-
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on the target key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. Should the inputDataSet contain the same key more
-   * than once, only the first value will be considered.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
-  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Joins the edge DataSet with an input DataSet on the target key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. Should the inputDataSet contain the same key more
-   * than once, only the first value will be considered.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
-    EV): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      val cleanFun = clean(fun)
-
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
-      }
-    }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
-  }
-
-  /**
-   * Apply filtering functions to the graph and return a sub-graph that
-   * satisfies the predicates for both vertices and edges.
-   *
-   * @param vertexFilter the filter function for vertices.
-   * @param edgeFilter the filter function for edges.
-   * @return the resulting sub-graph.
-   */
-  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K,
-    EV]]) = {
-    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
-  }
-
-  /**
-   * Apply filtering functions to the graph and return a sub-graph that
-   * satisfies the predicates for both vertices and edges.
-   *
-   * @param vertexFilterFun the filter function for vertices.
-   * @param edgeFilterFun the filter function for edges.
-   * @return the resulting sub-graph.
-   */
-  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: Edge[K, EV] =>
-    Boolean) = {
-    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
-      val cleanVertexFun = clean(vertexFilterFun)
-
-      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
-    }
-
-    val edgeFilter = new FilterFunction[Edge[K, EV]] {
-      val cleanEdgeFun = clean(edgeFilterFun)
-
-      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
-    }
-
-    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
-  }
-
-  /**
-   * Apply a filtering function to the graph and return a sub-graph that
-   * satisfies the predicates only for the vertices.
-   *
-   * @param vertexFilter the filter function for vertices.
-   * @return the resulting sub-graph.
-   */
-  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
-    wrapGraph(jgraph.filterOnVertices(vertexFilter))
-  }
-
-  /**
-   * Apply a filtering function to the graph and return a sub-graph that
-   * satisfies the predicates only for the vertices.
-   *
-   * @param vertexFilterFun the filter function for vertices.
-   * @return the resulting sub-graph.
-   */
-  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
-    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
-      val cleanVertexFun = clean(vertexFilterFun)
-
-      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
-    }
-
-    wrapGraph(jgraph.filterOnVertices(vertexFilter))
-  }
-
-  /**
-   * Apply a filtering function to the graph and return a sub-graph that
-   * satisfies the predicates only for the edges.
-   *
-   * @param edgeFilter the filter function for edges.
-   * @return the resulting sub-graph.
-   */
-  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
-    wrapGraph(jgraph.filterOnEdges(edgeFilter))
-  }
-
-  /**
-   * Apply a filtering function to the graph and return a sub-graph that
-   * satisfies the predicates only for the edges.
-   *
-   * @param edgeFilterFun the filter function for edges.
-   * @return the resulting sub-graph.
-   */
-  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
-    val edgeFilter = new FilterFunction[Edge[K, EV]] {
-      val cleanEdgeFun = clean(edgeFilterFun)
-
-      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
-    }
-
-    wrapGraph(jgraph.filterOnEdges(edgeFilter))
-  }
-
-  /**
-   * Return the in-degree of all vertices in the graph
-   *
-   * @return A DataSet of Tuple2<vertexId, inDegree>
-   */
-  def inDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
-  }
-
-  /**
-   * Return the out-degree of all vertices in the graph
-   *
-   * @return A DataSet of Tuple2<vertexId, outDegree>
-   */
-  def outDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
-  }
-
-  /**
-   * Return the degree of all vertices in the graph
-   *
-   * @return A DataSet of Tuple2<vertexId, degree>
-   */
-  def getDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
-  }
-
-  /**
-   * This operation adds all inverse-direction edges to the graph.
-   *
-   * @return the undirected graph.
-   */
-  def getUndirected(): Graph[K, VV, EV] = {
-    new Graph(jgraph.getUndirected)
-  }
-
-  /**
-   * Reverse the direction of the edges in the graph
-   *
-   * @return a new graph with all edges reversed
-   * @throws UnsupportedOperationException
-   */
-  def reverse(): Graph[K, VV, EV] = {
-    new Graph(jgraph.reverse())
-  }
-
-  /**
-   * Compute an aggregate over the edges of each vertex. The function applied
-   * on the edges has access to the vertex value.
-   *
-   * @param edgesFunction the function to apply to the neighborhood
-   * @param direction     the edge direction (in-, out-, all-)
-   * @tparam T           the output type
-   * @return a dataset of a T
-   */
-  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction:
-                                                        EdgesFunctionWithVertexValue[K, VV, EV,
-                                                          T], direction: EdgeDirection):
-  DataSet[T] = {
-    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
-  }
-
-  /**
-   * Compute an aggregate over the edges of each vertex. The function applied
-   * on the edges has access to the vertex value.
-   *
-   * @param edgesFunction the function to apply to the neighborhood
-   * @param direction     the edge direction (in-, out-, all-)
-   * @tparam T           the output type
-   * @return a dataset of a T
-   */
-  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T],
-                                                        direction: EdgeDirection): DataSet[T] = {
-    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
-  }
-
-  /**
-   * Compute an aggregate over the neighbors (edges and vertices) of each
-   * vertex. The function applied on the neighbors has access to the vertex
-   * value.
-   *
-   * @param neighborsFunction the function to apply to the neighborhood
-   * @param direction         the edge direction (in-, out-, all-)
-   * @tparam T               the output type
-   * @return a dataset of a T
-   */
-  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
-                                                            NeighborsFunctionWithVertexValue[K,
-                                                              VV, EV, T], direction:
-                                                            EdgeDirection): DataSet[T] = {
-    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
-  }
-
-  /**
-   * Compute an aggregate over the neighbors (edges and vertices) of each
-   * vertex.
-   *
-   * @param neighborsFunction the function to apply to the neighborhood
-   * @param direction         the edge direction (in-, out-, all-)
-   * @tparam T               the output type
-   * @return a dataset of a T
-   */
-  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
-                                                            NeighborsFunction[K, VV, EV, T],
-                                                            direction: EdgeDirection):
-  DataSet[T] = {
-    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
-  }
-
-  /**
-   * @return a long integer representing the number of vertices
-   */
-  def numberOfVertices(): Long = {
-    jgraph.numberOfVertices()
-  }
-
-  /**
-   * @return a long integer representing the number of edges
-   */
-  def numberOfEdges(): Long = {
-    jgraph.numberOfEdges()
-  }
-
-  /**
-   * @return The IDs of the vertices as DataSet
-   */
-  def getVertexIds(): DataSet[K] = {
-    wrap(jgraph.getVertexIds)
-  }
-
-  /**
-   * @return The IDs of the edges as DataSet
-   */
-  def getEdgeIds(): DataSet[(K, K)] = {
-    wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1))
-  }
-
-  /**
-   * Adds the input vertex to the graph. If the vertex already
-   * exists in the graph, it will not be added again.
-   *
-   * @param vertex the vertex to be added
-   * @return the new graph containing the existing vertices as well as the one just added
-   */
-  def addVertex(vertex: Vertex[K, VV]) = {
-    wrapGraph(jgraph.addVertex(vertex))
-  }
-
-  /**
-  * Adds the list of vertices, passed as input, to the graph.
-  * If the vertices already exist in the graph, they will not be added once more.
-  *
-  * @param verticesToAdd the list of vertices to add
-  * @return the new graph containing the existing and newly added vertices
-  */
-  def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.addVertices(vertices.asJava))
-  }
-
-  /**
-  * Adds the given list edges to the graph.
-  *
-  * When adding an edge for a non-existing set of vertices,
-  * the edge is considered invalid and ignored.
-  *
-  * @param newEdges the data set of edges to be added
-  * @return a new graph containing the existing edges plus the newly added edges.
-  */
-  def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.addEdges(edges.asJava))
-  }
-
-    /**
-   * Adds the given edge to the graph. If the source and target vertices do
-   * not exist in the graph, they will also be added.
-   *
-   * @param source the source vertex of the edge
-   * @param target the target vertex of the edge
-   * @param edgeValue the edge value
-   * @return the new graph containing the existing vertices and edges plus the
-   *         newly added edge
-   */
-  def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV) = {
-    wrapGraph(jgraph.addEdge(source, target, edgeValue))
-  }
-
-  /**
-   * Removes the given vertex and its edges from the graph.
-   *
-   * @param vertex the vertex to remove
-   * @return the new graph containing the existing vertices and edges without
-   *         the removed vertex and its edges
-   */
-  def removeVertex(vertex: Vertex[K, VV]) = {
-    wrapGraph(jgraph.removeVertex(vertex))
-  }
-
-    /**
-   * Removes the given vertex and its edges from the graph.
-   *
-   * @param vertex the vertex to remove
-   * @return the new graph containing the existing vertices and edges without
-   *         the removed vertex and its edges
-   */
-  def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.removeVertices(vertices.asJava))
-  }
-
-  /**
-   * Removes all edges that match the given edge from the graph.
-   *
-   * @param edge the edge to remove
-   * @return the new graph containing the existing vertices and edges without
-   *         the removed edges
-   */
-  def removeEdge(edge: Edge[K, EV]) = {
-    wrapGraph(jgraph.removeEdge(edge))
-  }
-
-  /**
-   * Removes all the edges that match the edges in the given data set from the graph.
-   *
-   * @param edgesToBeRemoved the list of edges to be removed
-   * @return a new graph where the edges have been removed and in which the vertices remained intact
-   */
-  def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.removeEdges(edges.asJava))
-  }
-
-  /**
-   * Performs union on the vertices and edges sets of the input graphs
-   * removing duplicate vertices but maintaining duplicate edges.
-   *
-   * @param graph the graph to perform union with
-   * @return a new graph
-   */
-  def union(graph: Graph[K, VV, EV]) = {
-    wrapGraph(jgraph.union(graph.getWrappedGraph))
-  }
-
-  /**
-  * Performs Difference on the vertex and edge sets of the input graphs
-  * removes common vertices and edges. If a source/target vertex is removed,
-  * its corresponding edge will also be removed
-  * @param graph the graph to perform difference with
-  * @return a new graph where the common vertices and edges have been removed
-  */
-  def difference(graph: Graph[K, VV, EV]) = {
-    wrapGraph(jgraph.difference(graph.getWrappedGraph))
-  }
-
-  /**
-   * Compute an aggregate over the neighbor values of each
-   * vertex.
-   *
-   * @param reduceNeighborsFunction the function to apply to the neighborhood
-   * @param direction               the edge direction (in-, out-, all-)
-   * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
-   */
-  def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
-  EdgeDirection): DataSet[(K, VV)] = {
-    wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple
-      .f0, jtuple.f1))
-  }
-
-  /**
-   * Compute an aggregate over the edge values of each vertex.
-   *
-   * @param reduceEdgesFunction the function to apply to the neighborhood
-   * @param direction           the edge direction (in-, out-, all-)
-   * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
-   * @throws IllegalArgumentException
-   */
-  def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
-  DataSet[(K, EV)] = {
-    wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
-      jtuple.f1))
-  }
-
-  def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]):
-  T = {
-    jgraph.run(algorithm)
-  }
-
-  /**
-   * Runs a Vertex-Centric iteration on the graph.
-   * No configuration options are provided.
-   *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
-   * @param maxIterations maximum number of iterations to perform
-   *
-   * @return the updated Graph after the vertex-centric iteration has converged or
-   *         after maximumNumberOfIterations.
-   */
-  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, M, EV],
-                                   maxIterations: Int): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
-      maxIterations))
-  }
-
-  /**
-   * Runs a Vertex-Centric iteration on the graph with configuration options.
-   *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
-   * @param maxIterations maximum number of iterations to perform
-   * @param parameters the iteration configuration parameters
-   *
-   * @return the updated Graph after the vertex-centric iteration has converged or
-   *         after maximumNumberOfIterations.
-   */
-  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, M, EV],
-                                   maxIterations: Int, parameters: VertexCentricConfiguration):
-  Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
-      maxIterations, parameters))
-  }
-
-  /**
-   * Runs a Gather-Sum-Apply iteration on the graph.
-   * No configuration options are provided.
-   *
-   * @param gatherFunction the gather function collects information about adjacent
-   *                       vertices and edges
-   * @param sumFunction the sum function aggregates the gathered information
-   * @param applyFunction the apply function updates the vertex values with the aggregates
-   * @param maxIterations maximum number of iterations to perform
-   * @tparam M the intermediate type used between gather, sum and apply
-   *
-   * @return the updated Graph after the gather-sum-apply iteration has converged or
-   *         after maximumNumberOfIterations.
-   */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
-  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K,
-    VV, EV] = {
-    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
-      maxIterations))
-  }
-
-  /**
-   * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
-   *
-   * @param gatherFunction the gather function collects information about adjacent
-   *                       vertices and edges
-   * @param sumFunction the sum function aggregates the gathered information
-   * @param applyFunction the apply function updates the vertex values with the aggregates
-   * @param maxIterations maximum number of iterations to perform
-   * @param parameters the iteration configuration parameters
-   * @tparam M the intermediate type used between gather, sum and apply
-   *
-   * @return the updated Graph after the gather-sum-apply iteration has converged or
-   *         after maximumNumberOfIterations.
-   */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
-  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int,
-                                    parameters: GSAConfiguration): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
-      maxIterations, parameters))
-  }
-
-  def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
-    jgraph.validate(validator)
-  }
-
-}