You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:16 UTC

[2/7] flink git commit: [FLINK-1962] Add Gelly Scala API

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
new file mode 100644
index 0000000..4b8f354
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.VertexToTuple2Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testJoinWithVertexSet {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
+        VertexToTuple2Map[Long, Long]), new AddValuesMapper)
+    result.getVerticesAsTuple2().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testJoinWithVertexSetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
+    val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
+      (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
+    result.getVerticesAsTuple2().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+  }
+
+
+  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+    @throws(classOf[Exception])
+    def map(tuple: (Long, Long)): Long = {
+      tuple._1 + tuple._2
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
new file mode 100644
index 0000000..7e5ad14
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.mapEdges(new AddOneMapper)
+      .getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,13\n" +
+      "1,3,14\n" + "" +
+      "2,3,24\n" +
+      "3,4,35\n" +
+      "3,5,36\n" +
+      "4,5,46\n" +
+      "5,1,52\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValueSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.mapEdges(edge => edge.getValue + 1)
+      .getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,13\n" +
+      "1,3,14\n" + "" +
+      "2,3,24\n" +
+      "3,4,35\n" +
+      "3,5,36\n" +
+      "4,5,46\n" +
+      "5,1,52\n"
+  }
+
+  final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): Long = {
+      edge.getValue + 1
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
new file mode 100644
index 0000000..a22cfbd
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.mapVertices(new AddOneMapper)
+      .getVerticesAsTuple2().writeAsCsv(resultPath)
+    env.execute
+
+    expectedResult = "1,2\n" +
+      "2,3\n" +
+      "3,4\n" +
+      "4,5\n" +
+      "5,6\n";
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValueSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.mapVertices(vertex => vertex.getValue + 1)
+      .getVerticesAsTuple2().writeAsCsv(resultPath)
+    env.execute
+
+    expectedResult = "1,2\n" +
+      "2,3\n" +
+      "3,4\n" +
+      "4,5\n" +
+      "5,6\n";
+  }
+
+  final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
+    @throws(classOf[Exception])
+    def map(vertex: Vertex[Long, Long]): Long = {
+      vertex.getValue + 1
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
new file mode 100644
index 0000000..6ed383a
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAllNeighborsWithValueGreaterThanFour {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+      EdgeDirection.ALL)
+    result.writeAsCsv(resultPath)
+    env.execute
+
+
+    expectedResult = "5,1\n" + "5,3\n" + "5,4"
+  }
+
+
+  @Test
+  @throws(classOf[Exception])
+  def testAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+    result.writeAsCsv(resultPath)
+    env.execute
+
+
+    expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" +
+      "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testLowestWeightOutNeighborNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
+    verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testLowestWeightInNeighborNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMinWeightNeighborNoValue, EdgeDirection.IN)
+    verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testMaxWeightAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+        SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
+    verticesWithMaxEdgeWeight.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"
+  }
+
+  final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
+    Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
+    Collector[(Long, Long)]): Unit = {
+      for (edge <- edges) {
+        if (v.getValue > 4) {
+          if (v.getId == edge.getTarget) {
+            out.collect((v.getId, edge.getSource))
+          }
+          else {
+            out.collect((v.getId, edge.getTarget))
+          }
+        }
+      }
+    }
+  }
+
+  final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
+      (Long, Long)]) {
+      for (edge <- edges) {
+        if (edge._1.equals(edge._2.getTarget)) {
+          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
+        }
+        else {
+          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
+        }
+      }
+    }
+  }
+
+  final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+      Math.min(firstEdgeValue, secondEdgeValue)
+    }
+  }
+
+  final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+      Math.max(firstEdgeValue, secondEdgeValue)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
new file mode 100644
index 0000000..52e6d7a
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
+import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfAllNeighborsNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfOutNeighborsNoValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfAllNeighbors {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
+    result.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result = graph.groupReduceOnNeighbors(new
+        SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
+    result.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285"
+  }
+
+  final class SumNeighbors extends ReduceNeighborsFunction[Long] {
+    override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
+      firstNeighbor + secondNeighbor
+    }
+  }
+
+  final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long,
+    Long)] {
+    @throws(classOf[Exception])
+    def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
+      Vertex[Long, Long])], out: Collector[(Long, Long)]) {
+      var sum: Long = 0
+      for (neighbor <- neighbors) {
+        sum += neighbor._2.getValue
+      }
+      out.collect((vertex.getId, sum + vertex.getValue))
+    }
+  }
+
+  final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
+  NeighborsFunction[Long, Long, Long, (Long, Long)] {
+    @throws(classOf[Exception])
+    def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
+                         out: Collector[(Long, Long)]) {
+      var sum: Long = 0
+      var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
+      val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
+        neighbors.iterator
+      while (neighborsIterator.hasNext) {
+        next = neighborsIterator.next
+        sum += next._3.getValue * next._2.getValue
+      }
+      if (next._1 > 2) {
+        out.collect(new Tuple2[Long, Long](next._1, sum))
+        out.collect(new Tuple2[Long, Long](next._1, sum * 2))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index ff27949..8552c01 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -383,6 +383,17 @@ public class Graph<K, VV, EV> {
 		TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
 				Vertex.class, keyType, valueType);
 
+		return mapVertices(mapper, returnType);
+	}
+
+	/**
+	 * Apply a function to the attribute of each vertex in the graph.
+	 *
+	 * @param mapper the map function to apply.
+	 * @param returnType the explicit return type.
+	 * @return a new graph
+	 */
+	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
 		DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
 				new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
 					public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
@@ -411,6 +422,18 @@ public class Graph<K, VV, EV> {
 		TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
 				Edge.class, keyType, keyType, valueType);
 
+		return mapEdges(mapper, returnType);
+	}
+
+	/**
+	 * Apply a function to the attribute of each edge in the graph.
+	 *
+	 * @param mapper the map function to apply.
+	 * @param returnType the explicit return type.
+	 * @return a new graph
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
 		DataSet<Edge<K, NV>> mappedEdges = edges.map(
 				new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
 					public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
@@ -754,6 +777,38 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Compute an aggregate over the edges of each vertex. The function applied
+	 * on the edges has access to the vertex value.
+	 *
+	 * @param edgesFunction
+	 *            the function to apply to the neighborhood
+	 * @param direction
+	 *            the edge direction (in-, out-, all-)
+	 * @param <T>
+	 *            the output type
+	 * @param typeInfo the explicit return type.
+	 * @return a dataset of a T
+	 * @throws IllegalArgumentException
+	 */
+	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+		switch (direction) {
+			case IN:
+				return vertices.coGroup(edges).where(0).equalTo(1)
+						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+			case OUT:
+				return vertices.coGroup(edges).where(0).equalTo(0)
+						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+			case ALL:
+				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
+						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
+	/**
+	 * Compute an aggregate over the edges of each vertex. The function applied
 	 * on the edges only has access to the vertex id (not the vertex value).
 	 * 
 	 * @param edgesFunction
@@ -785,6 +840,40 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
+	/**
+	 * Compute an aggregate over the edges of each vertex. The function applied
+	 * on the edges only has access to the vertex id (not the vertex value).
+	 *
+	 * @param edgesFunction
+	 *            the function to apply to the neighborhood
+	 * @param direction
+	 *            the edge direction (in-, out-, all-)
+	 * @param <T>
+	 *            the output type
+	 * @param typeInfo the explicit return type.
+	 * @return a dataset of T
+	 * @throws IllegalArgumentException
+	 */
+	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+		switch (direction) {
+			case IN:
+				return edges.map(new ProjectVertexIdMap<K, EV>(1))
+						.withForwardedFields("f1->f0")
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+			case OUT:
+				return edges.map(new ProjectVertexIdMap<K, EV>(0))
+						.withForwardedFields("f0")
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+			case ALL:
+				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
 	private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
 		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
@@ -1412,6 +1501,51 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Compute an aggregate over the neighbors (edges and vertices) of each
+	 * vertex. The function applied on the neighbors has access to the vertex
+	 * value.
+	 *
+	 * @param neighborsFunction the function to apply to the neighborhood
+	 * @param direction the edge direction (in-, out-, all-)
+	 * @param <T> the output type
+	 * @param typeInfo the explicit return type.
+	 * @return a dataset of a T
+	 * @throws IllegalArgumentException
+	 */
+	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+		switch (direction) {
+			case IN:
+				// create <edge-sourceVertex> pairs
+				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+						.join(this.vertices).where(0).equalTo(0);
+				return vertices.coGroup(edgesWithSources)
+						.where(0).equalTo("f0.f1")
+						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			case OUT:
+				// create <edge-targetVertex> pairs
+				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+						.join(this.vertices).where(1).equalTo(0);
+				return vertices.coGroup(edgesWithTargets)
+						.where(0).equalTo("f0.f0")
+						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			case ALL:
+				// create <edge-sourceOrTargetVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+						.join(this.vertices).where(1).equalTo(0)
+						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+				return vertices.coGroup(edgesWithNeighbors)
+						.where(0).equalTo(0)
+						.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
+
+	/**
+	 * Compute an aggregate over the neighbors (edges and vertices) of each
 	 * vertex. The function applied on the neighbors only has access to the
 	 * vertex id (not the vertex value).
 	 * 
@@ -1454,6 +1588,51 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
+	/**
+	 * Compute an aggregate over the neighbors (edges and vertices) of each
+	 * vertex. The function applied on the neighbors only has access to the
+	 * vertex id (not the vertex value).
+	 *
+	 * @param neighborsFunction the function to apply to the neighborhood
+	 * @param direction the edge direction (in-, out-, all-)
+	 * @param <T> the output type
+	 * @param typeInfo the explicit return type.
+	 * @return a dataset of a T
+	 * @throws IllegalArgumentException
+	 */
+	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+		switch (direction) {
+			case IN:
+				// create <edge-sourceVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+						.join(this.vertices).where(0).equalTo(0)
+						.with(new ProjectVertexIdJoin<K, VV, EV>(1))
+						.withForwardedFieldsFirst("f1->f0");
+				return edgesWithSources.groupBy(0).reduceGroup(
+						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			case OUT:
+				// create <edge-targetVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+						.join(this.vertices).where(1).equalTo(0)
+						.with(new ProjectVertexIdJoin<K, VV, EV>(0))
+						.withForwardedFieldsFirst("f0");
+				return edgesWithTargets.groupBy(0).reduceGroup(
+						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			case ALL:
+				// create <edge-sourceOrTargetVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+						.join(this.vertices).where(1).equalTo(0)
+						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+				return edgesWithNeighbors.groupBy(0).reduceGroup(
+						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
 	private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
 			implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index b3aec14..a0cda67 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
 		<module>flink-table</module>
 		<module>flink-ml</module>
 		<module>flink-language-binding</module>
+		<module>flink-gelly-scala</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->