You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:06:00 UTC
[22/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
new file mode 100644
index 0000000..2e51d90
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,6\n";
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValueSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,6\n";
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
+ @throws(classOf[Exception])
+ def map(vertex: Vertex[Long, Long]): Long = {
+ vertex.getValue + 1
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
new file mode 100644
index 0000000..dcd1deb
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testAllNeighborsWithValueGreaterThanFour {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+ EdgeDirection.ALL).collect().toList
+ expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+
+ @Test
+ @throws(classOf[Exception])
+ def testAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+ .collect().toList
+ expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
+ "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
+ "(5,1)\n" + "(5,3)\n" + "(5,4)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testLowestWeightOutNeighborNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
+ val res = verticesWithLowestOutNeighbor.collect().toList
+ expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testLowestWeightInNeighborNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMinWeightNeighborNoValue, EdgeDirection.IN)
+ val res = verticesWithLowestOutNeighbor.collect().toList
+ expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testMaxWeightAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
+ val res = verticesWithMaxEdgeWeight.collect().toList
+ expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
+ Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
+ Collector[(Long, Long)]): Unit = {
+ for (edge <- edges) {
+ if (v.getValue > 4) {
+ if (v.getId == edge.getTarget) {
+ out.collect((v.getId, edge.getSource))
+ }
+ else {
+ out.collect((v.getId, edge.getTarget))
+ }
+ }
+ }
+ }
+ }
+
+ final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
+ (Long, Long)]) {
+ for (edge <- edges) {
+ if (edge._1.equals(edge._2.getTarget)) {
+ out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
+ }
+ else {
+ out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
+ }
+ }
+ }
+ }
+
+ final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+ override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+ Math.min(firstEdgeValue, secondEdgeValue)
+ }
+ }
+
+ final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+ override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+ Math.max(firstEdgeValue, secondEdgeValue)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
new file mode 100644
index 0000000..aef5493
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
+import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfAllNeighborsNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
+ .collect().toList
+ expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfOutNeighborsNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList
+ expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
+ val res = result.collect().toList
+ expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnNeighbors(new
+ SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
+ val res = result.collect().toList
+ expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ final class SumNeighbors extends ReduceNeighborsFunction[Long] {
+ override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
+ firstNeighbor + secondNeighbor
+ }
+ }
+
+ final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long,
+ Long)] {
+ @throws(classOf[Exception])
+ def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
+ Vertex[Long, Long])], out: Collector[(Long, Long)]) {
+ var sum: Long = 0
+ for (neighbor <- neighbors) {
+ sum += neighbor._2.getValue
+ }
+ out.collect((vertex.getId, sum + vertex.getValue))
+ }
+ }
+
+ final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
+ NeighborsFunction[Long, Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
+ out: Collector[(Long, Long)]) {
+ var sum: Long = 0
+ var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
+ val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
+ neighbors.iterator
+ while (neighborsIterator.hasNext) {
+ next = neighborsIterator.next
+ sum += next._3.getValue * next._2.getValue
+ }
+ if (next._1 > 2) {
+ out.collect(new Tuple2[Long, Long](next._1, sum))
+ out.collect(new Tuple2[Long, Long](next._1, sum * 2))
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
new file mode 100644
index 0000000..7fd95ed
--- /dev/null
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-libraries</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-gelly</artifactId>
+ <name>flink-gelly</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-optimizer</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
new file mode 100644
index 0000000..d84badb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * An Edge represents a link between two {@link Vertex vertices},
+ * the source and the target and can carry an attached value.
+ * For edges with no value, use {@link org.apache.flink.types.NullValue} as the value type.
+ *
+ * @param <K> the key type for the sources and target vertices
+ * @param <V> the edge value type
+ */
+public class Edge<K, V> extends Tuple3<K, K, V>{
+
+ private static final long serialVersionUID = 1L;
+
+ public Edge(){}
+
+ public Edge(K src, K trg, V val) {
+ this.f0 = src;
+ this.f1 = trg;
+ this.f2 = val;
+ }
+
+ /**
+ * Reverses the direction of this Edge.
+ * @return a new Edge, where the source is the original Edge's target
+ * and the target is the original Edge's source.
+ */
+ public Edge<K, V> reverse() {
+ return new Edge<K, V>(this.f1, this.f0, this.f2);
+ }
+
+ public void setSource(K src) {
+ this.f0 = src;
+ }
+
+ public K getSource() {
+ return this.f0;
+ }
+
+ public void setTarget(K target) {
+ this.f1 = target;
+ }
+
+ public K getTarget() {
+ return f1;
+ }
+
+ public void setValue(V value) {
+ this.f2 = value;
+ }
+
+ public V getValue() {
+ return f2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
new file mode 100644
index 0000000..0a055bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * The EdgeDirection is used to select a node's neighborhood
+ * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and
+ * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
+ * methods.
+ */
+public enum EdgeDirection {
+ IN,
+ OUT,
+ ALL
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
new file mode 100644
index 0000000..bf1d6a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
+ *
+ * @param <K> the vertex key type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface EdgesFunction<K, EV, O> extends Function, Serializable {
+
+ void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
new file mode 100644
index 0000000..0b0ab0e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
+
+ void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
+}