You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:06:00 UTC

[22/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
new file mode 100644
index 0000000..2e51d90
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList
+    expectedResult = "1,2\n" +
+      "2,3\n" +
+      "3,4\n" +
+      "4,5\n" +
+      "5,6\n";
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValueSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList
+    expectedResult = "1,2\n" +
+      "2,3\n" +
+      "3,4\n" +
+      "4,5\n" +
+      "5,6\n";
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
+    @throws(classOf[Exception])
+    def map(vertex: Vertex[Long, Long]): Long = {
+      vertex.getValue + 1
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
new file mode 100644
index 0000000..dcd1deb
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testAllNeighborsWithValueGreaterThanFour {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+      EdgeDirection.ALL).collect().toList
+    expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+
+  @Test
+  @throws(classOf[Exception])
+  def testAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+    .collect().toList
+    expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
+      "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
+      "(5,1)\n" + "(5,3)\n" + "(5,4)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testLowestWeightOutNeighborNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
+    val res = verticesWithLowestOutNeighbor.collect().toList
+    expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testLowestWeightInNeighborNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMinWeightNeighborNoValue, EdgeDirection.IN)
+    val res = verticesWithLowestOutNeighbor.collect().toList
+    expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testMaxWeightAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
+    val res = verticesWithMaxEdgeWeight.collect().toList
+    expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
+    Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
+    Collector[(Long, Long)]): Unit = {
+      for (edge <- edges) {
+        if (v.getValue > 4) {
+          if (v.getId == edge.getTarget) {
+            out.collect((v.getId, edge.getSource))
+          }
+          else {
+            out.collect((v.getId, edge.getTarget))
+          }
+        }
+      }
+    }
+  }
+
+  final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
+      (Long, Long)]) {
+      for (edge <- edges) {
+        if (edge._1.equals(edge._2.getTarget)) {
+          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
+        }
+        else {
+          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
+        }
+      }
+    }
+  }
+
+  final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+      Math.min(firstEdgeValue, secondEdgeValue)
+    }
+  }
+
+  final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+      Math.max(firstEdgeValue, secondEdgeValue)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
new file mode 100644
index 0000000..aef5493
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
+import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfAllNeighborsNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
+    .collect().toList
+    expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfOutNeighborsNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList
+    expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
+    val res = result.collect().toList
+    expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnNeighbors(new
+        SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
+    val res = result.collect().toList
+    expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  final class SumNeighbors extends ReduceNeighborsFunction[Long] {
+    override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
+      firstNeighbor + secondNeighbor
+    }
+  }
+
+  final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long,
+    Long)] {
+    @throws(classOf[Exception])
+    def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
+      Vertex[Long, Long])], out: Collector[(Long, Long)]) {
+      var sum: Long = 0
+      for (neighbor <- neighbors) {
+        sum += neighbor._2.getValue
+      }
+      out.collect((vertex.getId, sum + vertex.getValue))
+    }
+  }
+
+  final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
+  NeighborsFunction[Long, Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
+                         out: Collector[(Long, Long)]) {
+      var sum: Long = 0
+      var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
+      val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
+        neighbors.iterator
+      while (neighborsIterator.hasNext) {
+        next = neighborsIterator.next
+        sum += next._3.getValue * next._2.getValue
+      }
+      if (next._1 > 2) {
+        out.collect(new Tuple2[Long, Long](next._1, sum))
+        out.collect(new Tuple2[Long, Long](next._1, sum * 2))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
new file mode 100644
index 0000000..7fd95ed
--- /dev/null
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-libraries</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<artifactId>flink-gelly</artifactId>
+	<name>flink-gelly</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-optimizer</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
new file mode 100644
index 0000000..d84badb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * An Edge represents a link between two {@link Vertex vertices},
+ * the source and the target and can carry an attached value.
+ * For edges with no value, use {@link org.apache.flink.types.NullValue} as the value type.
+ *
+ * @param <K> the key type for the sources and target vertices
+ * @param <V> the edge value type
+ */
+public class Edge<K, V> extends Tuple3<K, K, V>{
+
+	private static final long serialVersionUID = 1L;
+
+	public Edge(){}
+
+	public Edge(K src, K trg, V val) {
+		this.f0 = src;
+		this.f1 = trg;
+		this.f2 = val;
+	}
+
+	/**
+	 * Reverses the direction of this Edge.
+	 * @return a new Edge, where the source is the original Edge's target
+	 * and the target is the original Edge's source.
+	 */
+	public Edge<K, V> reverse() {
+			return new Edge<K, V>(this.f1, this.f0, this.f2);
+	}
+
+	public void setSource(K src) {
+		this.f0 = src;
+	}
+
+	public K getSource() {
+		return this.f0;
+	}
+
+	public void setTarget(K target) {
+		this.f1 = target;
+	}
+
+	public K getTarget() {
+		return f1;
+	}
+
+	public void setValue(V value) {
+		this.f2 = value;
+	}
+
+	public V getValue() {
+		return f2;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
new file mode 100644
index 0000000..0a055bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * The EdgeDirection is used to select a node's neighborhood
+ * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and
+ * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
+ * methods.
+ */
+public enum EdgeDirection {
+	IN,
+	OUT,
+	ALL
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
new file mode 100644
index 0000000..bf1d6a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
+ *
+ * @param <K> the vertex key type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface EdgesFunction<K, EV, O> extends Function, Serializable {
+
+	void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
new file mode 100644
index 0000000..0b0ab0e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
+
+	void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
+}