You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:40 UTC

[02/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
deleted file mode 100644
index 7553b32..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunction;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
-
-	public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testSumOfOutNeighbors() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-		
-		expectedResult = "1,5\n" +
-				"2,3\n" + 
-				"3,9\n" +
-				"4,5\n" + 
-				"5,1\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfInNeighbors() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-		
-		expectedResult = "1,255\n" +
-				"2,12\n" + 
-				"3,59\n" +
-				"4,102\n" + 
-				"5,285\n";
-		
-		compareResultAsTuples(result, expectedResult);
-		
-		
-	}
-
-	@Test
-	public void testSumOfOAllNeighbors() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * including own vertex value
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "1,11\n" +
-				"2,6\n" + 
-				"3,15\n" +
-				"4,12\n" + 
-				"5,13\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex with id greater than three.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-		
-		expectedResult = "4,5\n" +
-				"5,1\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfInNeighborsIdGreaterThanThree() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex with id greater than three.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum =
-				graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-		
-		expectedResult = "4,102\n" +
-				"5,285\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * including own vertex value
-		 * for each vertex with id greater than three.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "4,12\n" +
-				"5,13\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOutNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "1,5\n" +
-				"2,3\n" + 
-				"3,9\n" +
-				"4,5\n" + 
-				"5,1\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfInNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-	
-		expectedResult = "1,255\n" +
-				"2,12\n" +
-				"3,59\n" +
-				"4,102\n" +
-				"5,285\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfAllNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
-				graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
-	
-		expectedResult = "1,10\n" +
-				"2,4\n" + 
-				"3,12\n" +
-				"4,8\n" + 
-				"5,8\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex with id greater than two as well as the same sum multiplied by two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "3,9\n" +
-				"3,18\n" +
-				"4,5\n" +
-				"4,10\n" +
-				"5,1\n" +
-				"5,2";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * for each vertex with id greater than two as well as the same sum multiplied by two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "3,59\n" +
-				"3,118\n" +
-				"4,204\n" +
-				"4,102\n" +
-				"5,570\n" +
-				"5,285";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * for each vertex with id greater than two as well as the same sum multiplied by two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
-				graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
-
-		expectedResult = "3,12\n" +
-				"3,24\n" +
-				"4,8\n" +
-				"4,16\n" +
-				"5,8\n" +
-				"5,16";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOutNeighborsMultipliedByTwo() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex as well as the sum of out-neighbor values multiplied by two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-		
-		expectedResult = "1,5\n" +
-				"1,10\n" +
-				"2,3\n" +
-				"2,6\n" +
-				"3,9\n" +
-				"3,18\n" +
-				"4,5\n" +
-				"4,10\n" +
-				"5,1\n" +
-				"5,2";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfInNeighborsSubtractOne() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex as well as the same sum minus one.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum =
-				graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-		
-		expectedResult = "1,255\n" +
-				"1,254\n" +
-				"2,12\n" +
-				"2,11\n" +
-				"3,59\n" +
-				"3,58\n" +
-				"4,102\n" +
-				"4,101\n" +
-				"5,285\n" +
-				"5,284";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testSumOfOAllNeighborsAddFive() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * including own vertex value
-		 * for each vertex as well as the same sum plus five.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-				graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
-		expectedResult = "1,11\n" +
-				"1,16\n" +
-				"2,6\n" +
-				"2,11\n" +
-				"3,15\n" +
-				"3,20\n" +
-				"4,12\n" +
-				"4,17\n" +
-				"5,13\n" +
-				"5,18";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-	Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
-			
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
-		
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-	
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-					sum += neighbor.f1.getValue();
-			}
-			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
-			}
-			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
-
-		@Override
-		public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
-			return firstNeighbor + secondNeighbor;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue() * next.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(next.f0, sum));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue();
-			}
-			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue() * next.f1.getValue();
-			}
-			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue();
-			}
-			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
deleted file mode 100644
index b32abeb..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class ReduceOnNeighborsWithExceptionITCase {
-
-	private static final int PARALLELISM = 4;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-
-	@BeforeClass
-	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Cluster shutdown caused an exception: " + t.getMessage());
-		}
-	}
-
-	/**
-	 * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
-	 * with an edge having a srcId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-					graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
-			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	/**
-	 * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
-	 * with an edge having a trgId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
-					graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
-			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	/**
-	 * Test groupReduceOnNeighbors() -NeighborsFunction-
-	 * with an edge having a srcId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
-					graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-
-			verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	/**
-	 * Test groupReduceOnNeighbors() -NeighborsFunction-
-	 * with an edge having a trgId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
-					graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-
-			verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
-			Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateNeighbors(Vertex<Long, Long> vertex,
-									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
-									 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
-
-		@Override
-		public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
-			return firstNeighbor + secondNeighbor;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 67aec5a..7bc76a7 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -40,12 +40,10 @@ under the License.
 		<module>flink-hadoop-compatibility</module>
 		<module>flink-streaming</module>
 		<module>flink-hbase</module>
-		<module>flink-gelly</module>
 		<module>flink-hcatalog</module>
 		<module>flink-table</module>
 		<module>flink-ml</module>
 		<module>flink-language-binding</module>
-		<module>flink-gelly-scala</module>
 		<module>flink-scala-shell</module>
 	</modules>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e90ad6..63ffa55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ under the License.
 		<module>flink-tests</module>
 		<module>flink-test-utils</module>
 		<module>flink-staging</module>
+		<module>flink-libraries</module>
 		<module>flink-quickstart</module>
 		<module>flink-contrib</module>
 		<module>flink-dist</module>