You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:06:02 UTC

[24/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

[FLINK-2833] [gelly] create a flink-libraries module and move gelly there

This closes #1241


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91ffbc1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91ffbc1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91ffbc1e

Branch: refs/heads/master
Commit: 91ffbc1e3b626b93398271a6ea10c57b18459339
Parents: 71d5a39
Author: vasia <va...@apache.org>
Authored: Thu Oct 8 11:48:06 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Oct 9 17:33:15 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |    6 +-
 flink-libraries/flink-gelly-scala/pom.xml       |  204 ++
 .../flink/graph/scala/EdgesFunction.scala       |   35 +
 .../scala/EdgesFunctionWithVertexValue.scala    |   33 +
 .../org/apache/flink/graph/scala/Graph.scala    | 1014 +++++++++
 .../flink/graph/scala/NeighborsFunction.scala   |   37 +
 .../NeighborsFunctionWithVertexValue.scala      |   40 +
 .../scala/example/ConnectedComponents.scala     |  121 ++
 .../example/GSASingleSourceShortestPaths.scala  |  156 ++
 .../graph/scala/example/GraphMetrics.scala      |  128 ++
 .../example/SingleSourceShortestPaths.scala     |  170 ++
 .../org/apache/flink/graph/scala/package.scala  |   30 +
 .../graph/scala/utils/EdgeToTuple3Map.scala     |   30 +
 .../graph/scala/utils/Tuple2ToVertexMap.scala   |   30 +
 .../graph/scala/utils/Tuple3ToEdgeMap.scala     |   30 +
 .../graph/scala/utils/VertexToTuple2Map.scala   |   30 +
 .../test/GellyScalaAPICompletenessTest.scala    |   43 +
 .../flink/graph/scala/test/TestGraphUtils.scala |   55 +
 .../scala/test/operations/DegreesITCase.scala   |   69 +
 .../operations/GraphCreationWithCsvITCase.scala |  225 ++
 .../test/operations/GraphMutationsITCase.scala  |  260 +++
 .../test/operations/GraphOperationsITCase.scala |  282 +++
 .../test/operations/JoinWithEdgesITCase.scala   |  151 ++
 .../operations/JoinWithVerticesITCase.scala     |   74 +
 .../scala/test/operations/MapEdgesITCase.scala  |   81 +
 .../test/operations/MapVerticesITCase.scala     |   76 +
 .../operations/ReduceOnEdgesMethodsITCase.scala |  150 ++
 .../ReduceOnNeighborMethodsITCase.scala         |  126 ++
 flink-libraries/flink-gelly/pom.xml             |   67 +
 .../main/java/org/apache/flink/graph/Edge.java  |   75 +
 .../org/apache/flink/graph/EdgeDirection.java   |   35 +
 .../org/apache/flink/graph/EdgesFunction.java   |   38 +
 .../graph/EdgesFunctionWithVertexValue.java     |   39 +
 .../main/java/org/apache/flink/graph/Graph.java | 1948 ++++++++++++++++++
 .../org/apache/flink/graph/GraphAlgorithm.java  |   30 +
 .../org/apache/flink/graph/GraphCsvReader.java  |  486 +++++
 .../flink/graph/IterationConfiguration.java     |  155 ++
 .../apache/flink/graph/NeighborsFunction.java   |   40 +
 .../graph/NeighborsFunctionWithVertexValue.java |   40 +
 .../apache/flink/graph/ReduceEdgesFunction.java |   34 +
 .../flink/graph/ReduceNeighborsFunction.java    |   35 +
 .../java/org/apache/flink/graph/Triplet.java    |   77 +
 .../java/org/apache/flink/graph/Vertex.java     |   56 +
 .../graph/example/ConnectedComponents.java      |  141 ++
 .../graph/example/EuclideanGraphWeighing.java   |  215 ++
 .../example/GSASingleSourceShortestPaths.java   |  191 ++
 .../flink/graph/example/GraphMetrics.java       |  170 ++
 .../flink/graph/example/IncrementalSSSP.java    |  268 +++
 .../graph/example/JaccardSimilarityMeasure.java |  214 ++
 .../flink/graph/example/MusicProfiles.java      |  302 +++
 .../example/SingleSourceShortestPaths.java      |  199 ++
 .../example/utils/CommunityDetectionData.java   |   95 +
 .../utils/ConnectedComponentsDefaultData.java   |   57 +
 .../graph/example/utils/EuclideanGraphData.java |   86 +
 .../flink/graph/example/utils/ExampleUtils.java |  162 ++
 .../example/utils/IncrementalSSSPData.java      |   95 +
 .../utils/JaccardSimilarityMeasureData.java     |   58 +
 .../example/utils/LabelPropagationData.java     |  114 +
 .../graph/example/utils/MusicProfilesData.java  |  108 +
 .../flink/graph/example/utils/PageRankData.java |   69 +
 .../utils/SingleSourceShortestPathsData.java    |   62 +
 .../graph/example/utils/TriangleCountData.java  |   56 +
 .../apache/flink/graph/gsa/ApplyFunction.java   |  155 ++
 .../flink/graph/gsa/GSAConfiguration.java       |  133 ++
 .../apache/flink/graph/gsa/GatherFunction.java  |  134 ++
 .../graph/gsa/GatherSumApplyIteration.java      |  425 ++++
 .../org/apache/flink/graph/gsa/Neighbor.java    |   45 +
 .../org/apache/flink/graph/gsa/SumFunction.java |  134 ++
 .../flink/graph/library/CommunityDetection.java |  183 ++
 .../graph/library/ConnectedComponents.java      |   97 +
 .../graph/library/GSAConnectedComponents.java   |   88 +
 .../apache/flink/graph/library/GSAPageRank.java |  135 ++
 .../library/GSASingleSourceShortestPaths.java   |  101 +
 .../flink/graph/library/GSATriangleCount.java   |  190 ++
 .../flink/graph/library/LabelPropagation.java   |  116 ++
 .../apache/flink/graph/library/PageRank.java    |  146 ++
 .../library/SingleSourceShortestPaths.java      |  112 +
 .../flink/graph/spargel/MessageIterator.java    |   58 +
 .../flink/graph/spargel/MessagingFunction.java  |  303 +++
 .../spargel/VertexCentricConfiguration.java     |  135 ++
 .../graph/spargel/VertexCentricIteration.java   |  686 ++++++
 .../graph/spargel/VertexUpdateFunction.java     |  253 +++
 .../flink/graph/utils/EdgeToTuple3Map.java      |   35 +
 .../flink/graph/utils/NullValueEdgeMapper.java  |   32 +
 .../flink/graph/utils/Tuple2ToVertexMap.java    |   35 +
 .../flink/graph/utils/Tuple3ToEdgeMap.java      |   41 +
 .../flink/graph/utils/VertexToTuple2Map.java    |   35 +
 .../flink/graph/validation/GraphValidator.java  |   37 +
 .../validation/InvalidVertexIdsValidator.java   |   74 +
 .../apache/flink/graph/gsa/GSACompilerTest.java |  146 ++
 .../flink/graph/gsa/GSATranslationTest.java     |  164 ++
 .../graph/spargel/SpargelCompilerTest.java      |  214 ++
 .../graph/spargel/SpargelTranslationTest.java   |  231 +++
 .../test/CollectionModeSuperstepITCase.java     |   84 +
 .../test/GatherSumApplyConfigurationITCase.java |  404 ++++
 .../flink/graph/test/GatherSumApplyITCase.java  |  106 +
 .../apache/flink/graph/test/TestGraphUtils.java |  417 ++++
 .../test/VertexCentricConfigurationITCase.java  |  689 +++++++
 .../test/example/ConnectedComponentsITCase.java |   71 +
 .../example/EuclideanGraphWeighingITCase.java   |   77 +
 .../test/example/IncrementalSSSPITCase.java     |  133 ++
 .../example/JaccardSimilarityMeasureITCase.java |   72 +
 .../graph/test/example/MusicProfilesITCase.java |  101 +
 .../SingleSourceShortestPathsITCase.java        |   81 +
 .../test/library/CommunityDetectionITCase.java  |   82 +
 ...ctedComponentsWithRandomisedEdgesITCase.java |   94 +
 .../test/library/LabelPropagationITCase.java    |   78 +
 .../graph/test/library/PageRankITCase.java      |  132 ++
 .../graph/test/library/TriangleCountITCase.java |   56 +
 .../graph/test/operations/DegreesITCase.java    |  178 ++
 .../operations/DegreesWithExceptionITCase.java  |  189 ++
 .../test/operations/FromCollectionITCase.java   |  118 ++
 .../test/operations/GraphCreationITCase.java    |  174 ++
 .../operations/GraphCreationWithCsvITCase.java  |  204 ++
 .../GraphCreationWithMapperITCase.java          |  158 ++
 .../test/operations/GraphMutationsITCase.java   |  603 ++++++
 .../test/operations/GraphOperationsITCase.java  |  378 ++++
 .../test/operations/JoinWithEdgesITCase.java    |  532 +++++
 .../test/operations/JoinWithVerticesITCase.java |  214 ++
 .../graph/test/operations/MapEdgesITCase.java   |  210 ++
 .../test/operations/MapVerticesITCase.java      |  219 ++
 .../operations/ReduceOnEdgesMethodsITCase.java  |  615 ++++++
 .../ReduceOnEdgesWithExceptionITCase.java       |  142 ++
 .../ReduceOnNeighborMethodsITCase.java          |  668 ++++++
 .../ReduceOnNeighborsWithExceptionITCase.java   |  203 ++
 flink-libraries/pom.xml                         |   40 +
 flink-staging/flink-gelly-scala/pom.xml         |  204 --
 .../flink/graph/scala/EdgesFunction.scala       |   35 -
 .../scala/EdgesFunctionWithVertexValue.scala    |   33 -
 .../org/apache/flink/graph/scala/Graph.scala    | 1014 ---------
 .../flink/graph/scala/NeighborsFunction.scala   |   37 -
 .../NeighborsFunctionWithVertexValue.scala      |   40 -
 .../scala/example/ConnectedComponents.scala     |  121 --
 .../example/GSASingleSourceShortestPaths.scala  |  156 --
 .../graph/scala/example/GraphMetrics.scala      |  128 --
 .../example/SingleSourceShortestPaths.scala     |  170 --
 .../org/apache/flink/graph/scala/package.scala  |   30 -
 .../graph/scala/utils/EdgeToTuple3Map.scala     |   30 -
 .../graph/scala/utils/Tuple2ToVertexMap.scala   |   30 -
 .../graph/scala/utils/Tuple3ToEdgeMap.scala     |   30 -
 .../graph/scala/utils/VertexToTuple2Map.scala   |   30 -
 .../test/GellyScalaAPICompletenessTest.scala    |   43 -
 .../flink/graph/scala/test/TestGraphUtils.scala |   55 -
 .../scala/test/operations/DegreesITCase.scala   |   69 -
 .../operations/GraphCreationWithCsvITCase.scala |  225 --
 .../test/operations/GraphMutationsITCase.scala  |  260 ---
 .../test/operations/GraphOperationsITCase.scala |  282 ---
 .../test/operations/JoinWithEdgesITCase.scala   |  151 --
 .../operations/JoinWithVerticesITCase.scala     |   74 -
 .../scala/test/operations/MapEdgesITCase.scala  |   81 -
 .../test/operations/MapVerticesITCase.scala     |   76 -
 .../operations/ReduceOnEdgesMethodsITCase.scala |  150 --
 .../ReduceOnNeighborMethodsITCase.scala         |  126 --
 flink-staging/flink-gelly/pom.xml               |   67 -
 .../main/java/org/apache/flink/graph/Edge.java  |   75 -
 .../org/apache/flink/graph/EdgeDirection.java   |   35 -
 .../org/apache/flink/graph/EdgesFunction.java   |   38 -
 .../graph/EdgesFunctionWithVertexValue.java     |   39 -
 .../main/java/org/apache/flink/graph/Graph.java | 1948 ------------------
 .../org/apache/flink/graph/GraphAlgorithm.java  |   30 -
 .../org/apache/flink/graph/GraphCsvReader.java  |  486 -----
 .../flink/graph/IterationConfiguration.java     |  155 --
 .../apache/flink/graph/NeighborsFunction.java   |   40 -
 .../graph/NeighborsFunctionWithVertexValue.java |   40 -
 .../apache/flink/graph/ReduceEdgesFunction.java |   34 -
 .../flink/graph/ReduceNeighborsFunction.java    |   35 -
 .../java/org/apache/flink/graph/Triplet.java    |   77 -
 .../java/org/apache/flink/graph/Vertex.java     |   56 -
 .../graph/example/ConnectedComponents.java      |  141 --
 .../graph/example/EuclideanGraphWeighing.java   |  215 --
 .../example/GSASingleSourceShortestPaths.java   |  191 --
 .../flink/graph/example/GraphMetrics.java       |  170 --
 .../flink/graph/example/IncrementalSSSP.java    |  268 ---
 .../graph/example/JaccardSimilarityMeasure.java |  214 --
 .../flink/graph/example/MusicProfiles.java      |  302 ---
 .../example/SingleSourceShortestPaths.java      |  199 --
 .../example/utils/CommunityDetectionData.java   |   95 -
 .../utils/ConnectedComponentsDefaultData.java   |   57 -
 .../graph/example/utils/EuclideanGraphData.java |   86 -
 .../flink/graph/example/utils/ExampleUtils.java |  162 --
 .../example/utils/IncrementalSSSPData.java      |   95 -
 .../utils/JaccardSimilarityMeasureData.java     |   58 -
 .../example/utils/LabelPropagationData.java     |  114 -
 .../graph/example/utils/MusicProfilesData.java  |  108 -
 .../flink/graph/example/utils/PageRankData.java |   69 -
 .../utils/SingleSourceShortestPathsData.java    |   62 -
 .../graph/example/utils/TriangleCountData.java  |   56 -
 .../apache/flink/graph/gsa/ApplyFunction.java   |  155 --
 .../flink/graph/gsa/GSAConfiguration.java       |  133 --
 .../apache/flink/graph/gsa/GatherFunction.java  |  134 --
 .../graph/gsa/GatherSumApplyIteration.java      |  425 ----
 .../org/apache/flink/graph/gsa/Neighbor.java    |   45 -
 .../org/apache/flink/graph/gsa/SumFunction.java |  134 --
 .../flink/graph/library/CommunityDetection.java |  183 --
 .../graph/library/ConnectedComponents.java      |   97 -
 .../graph/library/GSAConnectedComponents.java   |   88 -
 .../apache/flink/graph/library/GSAPageRank.java |  135 --
 .../library/GSASingleSourceShortestPaths.java   |  101 -
 .../flink/graph/library/GSATriangleCount.java   |  190 --
 .../flink/graph/library/LabelPropagation.java   |  116 --
 .../apache/flink/graph/library/PageRank.java    |  146 --
 .../library/SingleSourceShortestPaths.java      |  112 -
 .../flink/graph/spargel/MessageIterator.java    |   58 -
 .../flink/graph/spargel/MessagingFunction.java  |  303 ---
 .../spargel/VertexCentricConfiguration.java     |  135 --
 .../graph/spargel/VertexCentricIteration.java   |  686 ------
 .../graph/spargel/VertexUpdateFunction.java     |  253 ---
 .../flink/graph/utils/EdgeToTuple3Map.java      |   35 -
 .../flink/graph/utils/NullValueEdgeMapper.java  |   32 -
 .../flink/graph/utils/Tuple2ToVertexMap.java    |   35 -
 .../flink/graph/utils/Tuple3ToEdgeMap.java      |   41 -
 .../flink/graph/utils/VertexToTuple2Map.java    |   35 -
 .../flink/graph/validation/GraphValidator.java  |   37 -
 .../validation/InvalidVertexIdsValidator.java   |   74 -
 .../apache/flink/graph/gsa/GSACompilerTest.java |  146 --
 .../flink/graph/gsa/GSATranslationTest.java     |  164 --
 .../graph/spargel/SpargelCompilerTest.java      |  214 --
 .../graph/spargel/SpargelTranslationTest.java   |  231 ---
 .../test/CollectionModeSuperstepITCase.java     |   84 -
 .../test/GatherSumApplyConfigurationITCase.java |  404 ----
 .../flink/graph/test/GatherSumApplyITCase.java  |  106 -
 .../apache/flink/graph/test/TestGraphUtils.java |  417 ----
 .../test/VertexCentricConfigurationITCase.java  |  689 -------
 .../test/example/ConnectedComponentsITCase.java |   71 -
 .../example/EuclideanGraphWeighingITCase.java   |   77 -
 .../test/example/IncrementalSSSPITCase.java     |  133 --
 .../example/JaccardSimilarityMeasureITCase.java |   72 -
 .../graph/test/example/MusicProfilesITCase.java |  101 -
 .../SingleSourceShortestPathsITCase.java        |   81 -
 .../test/library/CommunityDetectionITCase.java  |   82 -
 ...ctedComponentsWithRandomisedEdgesITCase.java |   94 -
 .../test/library/LabelPropagationITCase.java    |   78 -
 .../graph/test/library/PageRankITCase.java      |  132 --
 .../graph/test/library/TriangleCountITCase.java |   56 -
 .../graph/test/operations/DegreesITCase.java    |  178 --
 .../operations/DegreesWithExceptionITCase.java  |  189 --
 .../test/operations/FromCollectionITCase.java   |  118 --
 .../test/operations/GraphCreationITCase.java    |  174 --
 .../operations/GraphCreationWithCsvITCase.java  |  204 --
 .../GraphCreationWithMapperITCase.java          |  158 --
 .../test/operations/GraphMutationsITCase.java   |  603 ------
 .../test/operations/GraphOperationsITCase.java  |  378 ----
 .../test/operations/JoinWithEdgesITCase.java    |  532 -----
 .../test/operations/JoinWithVerticesITCase.java |  214 --
 .../graph/test/operations/MapEdgesITCase.java   |  210 --
 .../test/operations/MapVerticesITCase.java      |  219 --
 .../operations/ReduceOnEdgesMethodsITCase.java  |  615 ------
 .../ReduceOnEdgesWithExceptionITCase.java       |  142 --
 .../ReduceOnNeighborMethodsITCase.java          |  668 ------
 .../ReduceOnNeighborsWithExceptionITCase.java   |  203 --
 flink-staging/pom.xml                           |    2 -
 pom.xml                                         |    1 +
 252 files changed, 21696 insertions(+), 21657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 766b395..f8e4f28 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -30,7 +30,7 @@ Gelly is a Graph API for Flink. It contains a set of methods and utilities which
 Using Gelly
 -----------
 
-Gelly is currently part of the *staging* Maven project. All relevant classes are located in the *org.apache.flink.graph* package.
+Gelly is currently part of the *libraries* Maven project. All relevant classes are located in the *org.apache.flink.graph* package.
 
 Add the following dependency to your `pom.xml` to use Gelly.
 
@@ -57,7 +57,7 @@ Add the following dependency to your `pom.xml` to use Gelly.
 
 Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here](../apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API. After reading this guide, you might also want to check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ "Gelly examples" %}.
+The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API. After reading this guide, you might also want to check the {% gh_link /flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ "Gelly examples" %}.
 
 Graph Representation
 -----------
@@ -1194,7 +1194,7 @@ final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
 
 Note that `gather` takes a `Neighbor` type as an argument. This is a convenience type which simply wraps a vertex with its neighboring edge.
 
-For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java "GSAPageRank" %} and {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java "GSAConnectedComponents" %} library methods of Gelly.
+For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the {% gh_link /flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java "GSAPageRank" %} and {% gh_link /flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java "GSAConnectedComponents" %} library methods of Gelly.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
new file mode 100644
index 0000000..90d2971
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-libraries</artifactId>
+        <version>0.10-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-gelly-scala</artifactId>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-gelly</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Scala Compiler -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.1.4</version>
+                <executions>
+                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
+                        scala classes can be resolved later in the (Java) compile phase -->
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+
+                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) test-compile phase -->
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <jvmArgs>
+                        <jvmArg>-Xms128m</jvmArg>
+                        <jvmArg>-Xmx512m</jvmArg>
+                    </jvmArgs>
+                    <compilerPlugins combine.children="append">
+                        <compilerPlugin>
+                            <groupId>org.scalamacros</groupId>
+                            <artifactId>paradise_${scala.version}</artifactId>
+                            <version>${scala.macros.version}</version>
+                        </compilerPlugin>
+                    </compilerPlugins>
+                </configuration>
+            </plugin>
+
+            <!-- Eclipse Integration -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-eclipse-plugin</artifactId>
+                <version>2.8</version>
+                <configuration>
+                    <downloadSources>true</downloadSources>
+                    <projectnatures>
+                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                    </projectnatures>
+                    <buildcommands>
+                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                    </buildcommands>
+                    <classpathContainers>
+                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                    </classpathContainers>
+                    <excludes>
+                        <exclude>org.scala-lang:scala-library</exclude>
+                        <exclude>org.scala-lang:scala-compiler</exclude>
+                    </excludes>
+                    <sourceIncludes>
+                        <sourceInclude>**/*.scala</sourceInclude>
+                        <sourceInclude>**/*.java</sourceInclude>
+                    </sourceIncludes>
+                </configuration>
+            </plugin>
+
+            <!-- Adding scala source directories to build path -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.7</version>
+                <executions>
+                    <!-- Add src/main/scala to eclipse build path -->
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <!-- Add src/test/scala to eclipse build path -->
+                    <execution>
+                        <id>add-test-source</id>
+                        <phase>generate-test-sources</phase>
+                        <goals>
+                            <goal>add-test-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/test/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+                <version>0.5.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <verbose>false</verbose>
+                    <failOnViolation>true</failOnViolation>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                    <failOnWarning>false</failOnWarning>
+                    <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                    <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                    <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                    <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                    <outputEncoding>UTF-8</outputEncoding>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
new file mode 100644
index 0000000..70a5fdf
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
+
+  def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T])
+
+  override def iterateEdges(edges: java.lang.Iterable[Tuple2[K, Edge[K, EV]]], out:
+  Collector[T]): Unit = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(edges)
+      .map(jtuple => (jtuple.f0, jtuple.f1))
+    iterateEdges(scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
new file mode 100644
index 0000000..82589b6
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
+.EdgesFunctionWithVertexValue[K, VV, EV, T] {
+  @throws(classOf[Exception])
+  def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T])
+
+  override def iterateEdges(v: Vertex[K, VV], edges: java.lang.Iterable[Edge[K, EV]], out:
+  Collector[T]) = {
+    iterateEdges(v, scala.collection.JavaConversions.iterableAsScalaIterable(edges), out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
new file mode 100644
index 0000000..28f3f12
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{tuple => jtuple}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.validation.GraphValidator
+import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
+import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
+import org.apache.flink.{graph => jg}
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.reflect.ClassTag
+import org.apache.flink.types.NullValue
+
+object Graph {
+
+  /**
+  * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+  */
+  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
+                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a graph from a DataSet of edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a Seq of vertices and a Seq of edges.
+  */
+  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
+  ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
+      .asJavaCollection, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a Seq of edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a graph from a Seq of edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSets of Tuples.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
+                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
+        env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of Tuples representing the edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of Tuples representing the edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph with from a CSV file of vertices and a CSV file of edges
+  * 
+  * @param The Execution Environment.
+  * @param pathEdges The file path containing the edges.
+  * @param readVertices Defines whether the vertices have associated values.
+  * If set to false, the vertex input is ignored and vertices are created from the edges file.
+  * @param pathVertices The file path containing the vertices.
+  * @param hasEdgeValues Defines whether the edges have associated values. True by default.
+  * @param lineDelimiterVertices The string that separates lines in the vertices file.
+  * It defaults to newline.
+  * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values
+  * in the vertices file.
+  * @param quoteCharacterVertices The character to use for quoted String parsing
+  * in the vertices file. Disabled by default.
+  * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
+  * @param ignoreCommentsVertices Lines that start with the given String in the vertices file
+  * are ignored, disabled by default.
+  * @param lenientVertices Whether the parser should silently ignore malformed lines in the
+  * vertices file.
+  * @param includedFieldsVertices The fields in the vertices file that should be read.
+  * By default all fields are read.
+  * @param lineDelimiterEdges The string that separates lines in the edges file.
+  * It defaults to newline.
+  * @param fieldDelimiterEdges The string that separates fields in the edges file.
+  * @param quoteCharacterEdges The character to use for quoted String parsing
+  * in the edges file. Disabled by default.
+  * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
+  * @param ignoreCommentsEdges Lines that start with the given String in the edges file
+  * are ignored, disabled by default.
+  * @param lenientEdges Whether the parser should silently ignore malformed lines in the
+  * edges file.
+  * @param includedFieldsEdges The fields in the edges file that should be read.
+  * By default all fields are read.
+  * @param mapper If no vertex values are provided, this mapper can be used to initialize them.
+  * 
+  */
+  // scalastyle:off
+  // This method exceeds the max allowed number of parameters -->  
+  def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
+    EV: TypeInformation : ClassTag](
+      env: ExecutionEnvironment,
+      pathEdges: String,
+      readVertices: Boolean,
+      pathVertices: String = null,
+      hasEdgeValues: Boolean = true,
+      lineDelimiterVertices: String = "\n",
+      fieldDelimiterVertices: String = ",",
+      quoteCharacterVertices: Character = null,
+      ignoreFirstLineVertices: Boolean = false,
+      ignoreCommentsVertices: String = null,
+      lenientVertices: Boolean = false,
+      includedFieldsVertices: Array[Int] = null,
+      lineDelimiterEdges: String = "\n",
+      fieldDelimiterEdges: String = ",",
+      quoteCharacterEdges: Character = null,
+      ignoreFirstLineEdges: Boolean = false,
+      ignoreCommentsEdges: String = null,
+      lenientEdges: Boolean = false,
+      includedFieldsEdges: Array[Int] = null,
+      mapper: MapFunction[K, VV] = null) = {
+
+    // with vertex and edge values
+    if (readVertices && hasEdgeValues) {
+      if (pathVertices.equals(null)) {
+        throw new IllegalArgumentException(
+            "The vertices file path must be specified when readVertices is true.")
+      } else {
+        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
+            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
+            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
+
+        val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+            includedFieldsEdges)
+     
+        fromTupleDataSet[K, VV, EV](vertices, edges, env) 
+      }
+    }
+    // with vertex value and no edge value
+    else if (readVertices && (!hasEdgeValues)) {
+       if (pathVertices.equals(null)) {
+        throw new IllegalArgumentException(
+            "The vertices file path must be specified when readVertices is true.")
+      } else {
+        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
+            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
+            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
+
+        val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+            includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
+
+        fromTupleDataSet[K, VV, NullValue](vertices, edges, env)
+      }
+    }
+    // with edge value and no vertex value
+    else if ((!readVertices) && hasEdgeValues) {
+      val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+        quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+        includedFieldsEdges)
+
+      // initializer provided
+      if (mapper != null) {
+        fromTupleDataSet[K, VV, EV](edges, mapper, env)
+      }
+      else {
+        fromTupleDataSet[K, EV](edges, env) 
+      }
+    }
+    // with no edge value and no vertex value
+    else {
+      val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+      quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges,
+      lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
+
+      // no initializer provided
+      if (mapper != null) {
+        fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
+      }
+      else {
+        fromTupleDataSet[K, NullValue](edges, env) 
+      }
+    }
+  }
+// scalastyle:on
+
+}
+
+/**
+ * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
+ * @param jgraph the underlying java api Graph.
+ * @tparam K the key type for vertex and edge identifiers
+ * @tparam VV the value type for vertices
+ * @tparam EV the value type for edges
+ * @see org.apache.flink.graph.Edge
+ * @see org.apache.flink.graph.Vertex
+ */
+final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
+
+  private[flink] def getWrappedGraph = jgraph
+
+
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
+      ClosureCleaner.clean(f, checkSerializable)
+    }
+    ClosureCleaner.ensureSerializable(f)
+    f
+  }
+
+  /**
+   * @return the vertex DataSet.
+   */
+  def getVertices = wrap(jgraph.getVertices)
+
+  /**
+   * @return the edge DataSet.
+   */
+  def getEdges = wrap(jgraph.getEdges)
+
+  /**
+   * @return the vertex DataSet as Tuple2.
+   */
+  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
+    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
+  }
+
+  /**
+   * @return the edge DataSet as Tuple3.
+   */
+  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
+    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+  }
+
+  /**
+  * @return a DataSet of Triplets,
+  * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+  */
+  def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
+    wrap(jgraph.getTriplets())
+  }
+
+  /**
+   * Apply a function to the attribute of each vertex in the graph.
+   *
+   * @param mapper the map function to apply.
+   * @return a new graph
+   */
+  def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]):
+  Graph[K, NV, EV] = {
+    new Graph[K, NV, EV](jgraph.mapVertices[NV](
+      mapper,
+      createTypeInformation[Vertex[K, NV]]
+    ))
+  }
+
+  /**
+   * Apply a function to the attribute of each vertex in the graph.
+   *
+   * @param fun the map function to apply.
+   * @return a new graph
+   */
+  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV] = {
+    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, VV], NV] {
+      val cleanFun = clean(fun)
+
+      def map(in: Vertex[K, VV]): NV = cleanFun(in)
+    }
+    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, createTypeInformation[Vertex[K, NV]]))
+  }
+
+  /**
+   * Apply a function to the attribute of each edge in the graph.
+   *
+   * @param mapper the map function to apply.
+   * @return a new graph
+   */
+  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K,
+    VV, NV] = {
+    new Graph[K, VV, NV](jgraph.mapEdges[NV](
+      mapper,
+      createTypeInformation[Edge[K, NV]]
+    ))
+  }
+
+  /**
+   * Apply a function to the attribute of each edge in the graph.
+   *
+   * @param fun the map function to apply.
+   * @return a new graph
+   */
+  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV] = {
+    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, EV], NV] {
+      val cleanFun = clean(fun)
+
+      def map(in: Edge[K, EV]): NV = cleanFun(in)
+    }
+    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]]))
+  }
+
+  /**
+   * Joins the vertex DataSet of this graph with an input DataSet and applies
+   * a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @return a new graph where the vertex values have been updated.
+   */
+  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
+    (VV, T), VV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+      override def map(value: jtuple.Tuple2[VV, T]): VV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the vertex DataSet of this graph with an input DataSet and applies
+   * a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @return a new graph where the vertex values have been updated.
+   */
+  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
+  Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[VV, T]): VV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on a composite key of both
+   * source and target and applies a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
+    (EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
+      scalatuple._2, scalatuple._3)).javaSet
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on a composite key of both
+   * source and target and applies a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
+  Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
+      scalatuple._2, scalatuple._3)).javaSet
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the source key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. In case the inputDataSet contains the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
+  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the source key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. In case the inputDataSet contains the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
+    EV): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the target key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. Should the inputDataSet contain the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
+  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the target key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. Should the inputDataSet contain the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
+    EV): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Apply filtering functions to the graph and return a sub-graph that
+   * satisfies the predicates for both vertices and edges.
+   *
+   * @param vertexFilter the filter function for vertices.
+   * @param edgeFilter the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K,
+    EV]]) = {
+    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+  }
+
+  /**
+   * Apply filtering functions to the graph and return a sub-graph that
+   * satisfies the predicates for both vertices and edges.
+   *
+   * @param vertexFilterFun the filter function for vertices.
+   * @param edgeFilterFun the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: Edge[K, EV] =>
+    Boolean) = {
+    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+      val cleanVertexFun = clean(vertexFilterFun)
+
+      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
+    }
+
+    val edgeFilter = new FilterFunction[Edge[K, EV]] {
+      val cleanEdgeFun = clean(edgeFilterFun)
+
+      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
+    }
+
+    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the vertices.
+   *
+   * @param vertexFilter the filter function for vertices.
+   * @return the resulting sub-graph.
+   */
+  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
+    wrapGraph(jgraph.filterOnVertices(vertexFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the vertices.
+   *
+   * @param vertexFilterFun the filter function for vertices.
+   * @return the resulting sub-graph.
+   */
+  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
+    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+      val cleanVertexFun = clean(vertexFilterFun)
+
+      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
+    }
+
+    wrapGraph(jgraph.filterOnVertices(vertexFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the edges.
+   *
+   * @param edgeFilter the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
+    wrapGraph(jgraph.filterOnEdges(edgeFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the edges.
+   *
+   * @param edgeFilterFun the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
+    val edgeFilter = new FilterFunction[Edge[K, EV]] {
+      val cleanEdgeFun = clean(edgeFilterFun)
+
+      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
+    }
+
+    wrapGraph(jgraph.filterOnEdges(edgeFilter))
+  }
+
+  /**
+   * Return the in-degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, inDegree>
+   */
+  def inDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * Return the out-degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, outDegree>
+   */
+  def outDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * Return the degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, degree>
+   */
+  def getDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * This operation adds all inverse-direction edges to the graph.
+   *
+   * @return the undirected graph.
+   */
+  def getUndirected(): Graph[K, VV, EV] = {
+    new Graph(jgraph.getUndirected)
+  }
+
+  /**
+   * Reverse the direction of the edges in the graph
+   *
+   * @return a new graph with all edges reversed
+   * @throws UnsupportedOperationException
+   */
+  def reverse(): Graph[K, VV, EV] = {
+    new Graph(jgraph.reverse())
+  }
+
+  /**
+   * Compute an aggregate over the edges of each vertex. The function applied
+   * on the edges has access to the vertex value.
+   *
+   * @param edgesFunction the function to apply to the neighborhood
+   * @param direction     the edge direction (in-, out-, all-)
+   * @tparam T           the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction:
+                                                        EdgesFunctionWithVertexValue[K, VV, EV,
+                                                          T], direction: EdgeDirection):
+  DataSet[T] = {
+    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the edges of each vertex. The function applied
+   * on the edges has access to the vertex value.
+   *
+   * @param edgesFunction the function to apply to the neighborhood
+   * @param direction     the edge direction (in-, out-, all-)
+   * @tparam T           the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T],
+                                                        direction: EdgeDirection): DataSet[T] = {
+    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the neighbors (edges and vertices) of each
+   * vertex. The function applied on the neighbors has access to the vertex
+   * value.
+   *
+   * @param neighborsFunction the function to apply to the neighborhood
+   * @param direction         the edge direction (in-, out-, all-)
+   * @tparam T               the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
+                                                            NeighborsFunctionWithVertexValue[K,
+                                                              VV, EV, T], direction:
+                                                            EdgeDirection): DataSet[T] = {
+    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the neighbors (edges and vertices) of each
+   * vertex.
+   *
+   * @param neighborsFunction the function to apply to the neighborhood
+   * @param direction         the edge direction (in-, out-, all-)
+   * @tparam T               the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
+                                                            NeighborsFunction[K, VV, EV, T],
+                                                            direction: EdgeDirection):
+  DataSet[T] = {
+    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * @return a long integer representing the number of vertices
+   */
+  def numberOfVertices(): Long = {
+    jgraph.numberOfVertices()
+  }
+
+  /**
+   * @return a long integer representing the number of edges
+   */
+  def numberOfEdges(): Long = {
+    jgraph.numberOfEdges()
+  }
+
+  /**
+   * @return The IDs of the vertices as DataSet
+   */
+  def getVertexIds(): DataSet[K] = {
+    wrap(jgraph.getVertexIds)
+  }
+
+  /**
+   * @return The IDs of the edges as DataSet
+   */
+  def getEdgeIds(): DataSet[(K, K)] = {
+    wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1))
+  }
+
+  /**
+   * Adds the input vertex to the graph. If the vertex already
+   * exists in the graph, it will not be added again.
+   *
+   * @param vertex the vertex to be added
+   * @return the new graph containing the existing vertices as well as the one just added
+   */
+  def addVertex(vertex: Vertex[K, VV]) = {
+    wrapGraph(jgraph.addVertex(vertex))
+  }
+
+  /**
+  * Adds the list of vertices, passed as input, to the graph.
+  * If the vertices already exist in the graph, they will not be added once more.
+  *
+  * @param verticesToAdd the list of vertices to add
+  * @return the new graph containing the existing and newly added vertices
+  */
+  def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.addVertices(vertices.asJava))
+  }
+
+  /**
+  * Adds the given list edges to the graph.
+  *
+  * When adding an edge for a non-existing set of vertices,
+  * the edge is considered invalid and ignored.
+  *
+  * @param newEdges the data set of edges to be added
+  * @return a new graph containing the existing edges plus the newly added edges.
+  */
+  def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.addEdges(edges.asJava))
+  }
+
+    /**
+   * Adds the given edge to the graph. If the source and target vertices do
+   * not exist in the graph, they will also be added.
+   *
+   * @param source the source vertex of the edge
+   * @param target the target vertex of the edge
+   * @param edgeValue the edge value
+   * @return the new graph containing the existing vertices and edges plus the
+   *         newly added edge
+   */
+  def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV) = {
+    wrapGraph(jgraph.addEdge(source, target, edgeValue))
+  }
+
+  /**
+   * Removes the given vertex and its edges from the graph.
+   *
+   * @param vertex the vertex to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed vertex and its edges
+   */
+  def removeVertex(vertex: Vertex[K, VV]) = {
+    wrapGraph(jgraph.removeVertex(vertex))
+  }
+
+    /**
+   * Removes the given vertex and its edges from the graph.
+   *
+   * @param vertex the vertex to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed vertex and its edges
+   */
+  def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.removeVertices(vertices.asJava))
+  }
+
+  /**
+   * Removes all edges that match the given edge from the graph.
+   *
+   * @param edge the edge to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed edges
+   */
+  def removeEdge(edge: Edge[K, EV]) = {
+    wrapGraph(jgraph.removeEdge(edge))
+  }
+
+  /**
+   * Removes all the edges that match the edges in the given data set from the graph.
+   *
+   * @param edgesToBeRemoved the list of edges to be removed
+   * @return a new graph where the edges have been removed and in which the vertices remained intact
+   */
+  def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.removeEdges(edges.asJava))
+  }
+
+  /**
+   * Performs union on the vertices and edges sets of the input graphs
+   * removing duplicate vertices but maintaining duplicate edges.
+   *
+   * @param graph the graph to perform union with
+   * @return a new graph
+   */
+  def union(graph: Graph[K, VV, EV]) = {
+    wrapGraph(jgraph.union(graph.getWrappedGraph))
+  }
+
+  /**
+  * Performs Difference on the vertex and edge sets of the input graphs
+  * removes common vertices and edges. If a source/target vertex is removed,
+  * its corresponding edge will also be removed
+  * @param graph the graph to perform difference with
+  * @return a new graph where the common vertices and edges have been removed
+  */
+  def difference(graph: Graph[K, VV, EV]) = {
+    wrapGraph(jgraph.difference(graph.getWrappedGraph))
+  }
+
+  /**
+   * Compute an aggregate over the neighbor values of each
+   * vertex.
+   *
+   * @param reduceNeighborsFunction the function to apply to the neighborhood
+   * @param direction               the edge direction (in-, out-, all-)
+   * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+   */
+  def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
+  EdgeDirection): DataSet[(K, VV)] = {
+    wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple
+      .f0, jtuple.f1))
+  }
+
+  /**
+   * Compute an aggregate over the edge values of each vertex.
+   *
+   * @param reduceEdgesFunction the function to apply to the neighborhood
+   * @param direction           the edge direction (in-, out-, all-)
+   * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
+   * @throws IllegalArgumentException
+   */
+  def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
+  DataSet[(K, EV)] = {
+    wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
+      jtuple.f1))
+  }
+
+  def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]):
+  T = {
+    jgraph.run(algorithm)
+  }
+
+  /**
+   * Runs a Vertex-Centric iteration on the graph.
+   * No configuration options are provided.
+   *
+   * @param vertexUpdateFunction the vertex update function
+   * @param messagingFunction the messaging function
+   * @param maxIterations maximum number of iterations to perform
+   *
+   * @return the updated Graph after the vertex-centric iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+                                   maxIterations: Int): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+      maxIterations))
+  }
+
+  /**
+   * Runs a Vertex-Centric iteration on the graph with configuration options.
+   *
+   * @param vertexUpdateFunction the vertex update function
+   * @param messagingFunction the messaging function
+   * @param maxIterations maximum number of iterations to perform
+   * @param parameters the iteration configuration parameters
+   *
+   * @return the updated Graph after the vertex-centric iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+                                   maxIterations: Int, parameters: VertexCentricConfiguration):
+  Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+      maxIterations, parameters))
+  }
+
+  /**
+   * Runs a Gather-Sum-Apply iteration on the graph.
+   * No configuration options are provided.
+   *
+   * @param gatherFunction the gather function collects information about adjacent
+   *                       vertices and edges
+   * @param sumFunction the sum function aggregates the gathered information
+   * @param applyFunction the apply function updates the vertex values with the aggregates
+   * @param maxIterations maximum number of iterations to perform
+   * @tparam M the intermediate type used between gather, sum and apply
+   *
+   * @return the updated Graph after the gather-sum-apply iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K,
+    VV, EV] = {
+    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
+      maxIterations))
+  }
+
+  /**
+   * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
+   *
+   * @param gatherFunction the gather function collects information about adjacent
+   *                       vertices and edges
+   * @param sumFunction the sum function aggregates the gathered information
+   * @param applyFunction the apply function updates the vertex values with the aggregates
+   * @param maxIterations maximum number of iterations to perform
+   * @param parameters the iteration configuration parameters
+   * @tparam M the intermediate type used between gather, sum and apply
+   *
+   * @return the updated Graph after the gather-sum-apply iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int,
+                                    parameters: GSAConfiguration): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
+      maxIterations, parameters))
+  }
+
+  def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
+    jgraph.validate(validator)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
new file mode 100644
index 0000000..ca15dab
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.java.tuple.Tuple3
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+
+abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph
+.NeighborsFunction[K, VV, EV, T] {
+
+  def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T])
+
+  override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K,
+    VV]]], out: Collector[T]) = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
+      .map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+    iterateNeighbors(scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
new file mode 100644
index 0000000..cefc277
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import java.lang
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+
+abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
+.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
+
+  def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])
+    ], out: Collector[T]): Unit
+
+  override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K,
+    EV], Vertex[K, VV]]], out: Collector[T]): Unit = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
+      .map(jtuple => (jtuple.f0, jtuple.f1))
+    iterateNeighbors(vertex, scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
new file mode 100644
index 0000000..b3da520
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.library.GSAConnectedComponents
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in [[org.apache.flink.graph.library]]. 
+ * 
+ * In particular, this example uses the
+ * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ *   ConnectedComponents <edge path> <result path> <number of iterations>
+ *   }}
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
+
+    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
+
+
+    // emit result
+    if (fileOutput) {
+      components.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Connected Components Example")
+    } else {
+      components.print()
+    }
+  }
+
+  private final class InitVertices extends MapFunction[Long, Long] {
+    override def map(id: Long) = {id}
+  }
+
+  // ***********************************************************************
+  // UTIL METHODS
+  // ***********************************************************************
+
+    private var fileOutput = false
+    private var edgesInputPath: String = null
+    private var outputPath: String = null
+    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+    private def parseParameters(args: Array[String]): Boolean = {
+      if(args.length > 0) {
+        if(args.length != 3) {
+          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+            "<num iterations>")
+          false
+        }
+        fileOutput = true
+        edgesInputPath = args(0)
+        outputPath = args(1)
+        maxIterations = (2).toInt
+      } else {
+        System.out.println("Executing ConnectedComponents example with default parameters" +
+          " and built-in default data.")
+        System.out.println("  Provide parameters to read input data from files.")
+        System.out.println("  See the documentation for the correct format of input files.")
+        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+          "<num iterations>");
+      }
+      true
+    }
+
+    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+      if (fileOutput) {
+        env.readCsvFile[(Long, Long)](edgesInputPath,
+          lineDelimiter = "\n",
+          fieldDelimiter = "\t")
+          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      } else {
+        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData).map(
+        edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      }
+    }
+}