You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:40 UTC
[02/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
deleted file mode 100644
index 7553b32..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunction;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
-
- public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){
- super(mode);
- }
-
- private String expectedResult;
-
- @Test
- public void testSumOfOutNeighbors() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfInNeighbors() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
- List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-
- expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
-
- compareResultAsTuples(result, expectedResult);
-
-
- }
-
- @Test
- public void testSumOfOAllNeighbors() throws Exception {
- /*
- * Get the sum of all neighbor values
- * including own vertex value
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "1,11\n" +
- "2,6\n" +
- "3,15\n" +
- "4,12\n" +
- "5,13\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex with id greater than three.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "4,5\n" +
- "5,1\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfInNeighborsIdGreaterThanThree() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex with id greater than three.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
- List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-
- expectedResult = "4,102\n" +
- "5,285\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception {
- /*
- * Get the sum of all neighbor values
- * including own vertex value
- * for each vertex with id greater than three.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "4,12\n" +
- "5,13\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOutNeighborsNoValue() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfInNeighborsNoValue() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
- List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-
- expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfAllNeighborsNoValue() throws Exception {
- /*
- * Get the sum of all neighbor values
- * for each vertex
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
- List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
-
- expectedResult = "1,10\n" +
- "2,4\n" +
- "3,12\n" +
- "4,8\n" +
- "5,8\n";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex with id greater than two as well as the same sum multiplied by two.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "3,9\n" +
- "3,18\n" +
- "4,5\n" +
- "4,10\n" +
- "5,1\n" +
- "5,2";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * for each vertex with id greater than two as well as the same sum multiplied by two.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "3,59\n" +
- "3,118\n" +
- "4,204\n" +
- "4,102\n" +
- "5,570\n" +
- "5,285";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
- /*
- * Get the sum of all neighbor values
- * for each vertex with id greater than two as well as the same sum multiplied by two.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
- List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
-
- expectedResult = "3,12\n" +
- "3,24\n" +
- "4,8\n" +
- "4,16\n" +
- "5,8\n" +
- "5,16";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOutNeighborsMultipliedByTwo() throws Exception {
- /*
- * Get the sum of out-neighbor values
- * for each vertex as well as the sum of out-neighbor values multiplied by two.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "1,5\n" +
- "1,10\n" +
- "2,3\n" +
- "2,6\n" +
- "3,9\n" +
- "3,18\n" +
- "4,5\n" +
- "4,10\n" +
- "5,1\n" +
- "5,2";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfInNeighborsSubtractOne() throws Exception {
- /*
- * Get the sum of in-neighbor values
- * times the edge weights for each vertex as well as the same sum minus one.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
- List<Tuple2<Long,Long>> result = verticesWithSum.collect();
-
- expectedResult = "1,255\n" +
- "1,254\n" +
- "2,12\n" +
- "2,11\n" +
- "3,59\n" +
- "3,58\n" +
- "4,102\n" +
- "4,101\n" +
- "5,285\n" +
- "5,284";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @Test
- public void testSumOfOAllNeighborsAddFive() throws Exception {
- /*
- * Get the sum of all neighbor values
- * including own vertex value
- * for each vertex as well as the same sum plus five.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
-
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
- List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
- expectedResult = "1,11\n" +
- "1,16\n" +
- "2,6\n" +
- "2,11\n" +
- "3,15\n" +
- "3,20\n" +
- "4,12\n" +
- "4,17\n" +
- "5,13\n" +
- "5,18";
-
- compareResultAsTuples(result, expectedResult);
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f0.getValue() * neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- if(vertex.getId() > 3) {
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f0.getValue() * neighbor.f1.getValue();
- }
- if(vertex.getId() > 3) {
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- if(vertex.getId() > 3) {
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
-
- @Override
- public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
- return firstNeighbor + secondNeighbor;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue() * next.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(next.f0, sum));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue();
- }
- if(next.f0 > 2) {
- out.collect(new Tuple2<Long, Long>(next.f0, sum));
- out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue() * next.f1.getValue();
- }
- if(next.f0 > 2) {
- out.collect(new Tuple2<Long, Long>(next.f0, sum));
- out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
- Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
- neighbors.iterator();
- while(neighborsIterator.hasNext()) {
- next = neighborsIterator.next();
- sum += next.f2.getValue();
- }
- if(next.f0 > 2) {
- out.collect(new Tuple2<Long, Long>(next.f0, sum));
- out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f0.getValue() * neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
deleted file mode 100644
index b32abeb..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class ReduceOnNeighborsWithExceptionITCase {
-
- private static final int PARALLELISM = 4;
-
- private static ForkableFlinkMiniCluster cluster;
-
-
- @BeforeClass
- public static void setupCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Error starting test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDownCluster() {
- try {
- cluster.stop();
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Cluster shutdown caused an exception: " + t.getMessage());
- }
- }
-
- /**
- * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
- * with an edge having a srcId that does not exist in the vertex DataSet
- */
- @Test
- public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
-
- try {
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- env.execute();
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- /**
- * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue-
- * with an edge having a trgId that does not exist in the vertex DataSet
- */
- @Test
- public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
-
- try {
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- env.execute();
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- /**
- * Test groupReduceOnNeighbors() -NeighborsFunction-
- * with an edge having a srcId that does not exist in the vertex DataSet
- */
- @Test
- public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
-
- try {
- DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- env.execute();
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- /**
- * Test groupReduceOnNeighbors() -NeighborsFunction-
- * with an edge having a trgId that does not exist in the vertex DataSet
- */
- @Test
- public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
-
- try {
- DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- env.execute();
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- @Override
- public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- long sum = 0;
- for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
- }
- out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumNeighbors implements ReduceNeighborsFunction<Long> {
-
- @Override
- public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
- return firstNeighbor + secondNeighbor;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 67aec5a..7bc76a7 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -40,12 +40,10 @@ under the License.
<module>flink-hadoop-compatibility</module>
<module>flink-streaming</module>
<module>flink-hbase</module>
- <module>flink-gelly</module>
<module>flink-hcatalog</module>
<module>flink-table</module>
<module>flink-ml</module>
<module>flink-language-binding</module>
- <module>flink-gelly-scala</module>
<module>flink-scala-shell</module>
</modules>
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e90ad6..63ffa55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ under the License.
<module>flink-tests</module>
<module>flink-test-utils</module>
<module>flink-staging</module>
+ <module>flink-libraries</module>
<module>flink-quickstart</module>
<module>flink-contrib</module>
<module>flink-dist</module>