You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:50 UTC
[12/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
deleted file mode 100644
index ca15dab..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.Collector
-
-
-abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph
-.NeighborsFunction[K, VV, EV, T] {
-
- def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T])
-
- override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K,
- VV]]], out: Collector[T]) = {
- val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
- .map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
- iterateNeighbors(scalaIterable, out)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
deleted file mode 100644
index cefc277..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import java.lang
-
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.Collector
-
-
-abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
-.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
-
- def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])
- ], out: Collector[T]): Unit
-
- override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K,
- EV], Vertex[K, VV]]], out: Collector[T]): Unit = {
- val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
- .map(jtuple => (jtuple.f0, jtuple.f1))
- iterateNeighbors(vertex, scalaIterable, out)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
deleted file mode 100644
index b3da520..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.library.GSAConnectedComponents
-import java.lang.Long
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in [[org.apache.flink.graph.library]].
- *
- * In particular, this example uses the
- * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 and 1-3.
- *
- * Usage {{
- * ConnectedComponents <edge path> <result path> <number of iterations>
- * }}
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
- */
-object ConnectedComponents {
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
- val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
-
- val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
-
-
- // emit result
- if (fileOutput) {
- components.writeAsCsv(outputPath, "\n", ",")
- env.execute("Connected Components Example")
- } else {
- components.print()
- }
- }
-
- private final class InitVertices extends MapFunction[Long, Long] {
- override def map(id: Long) = {id}
- }
-
- // ***********************************************************************
- // UTIL METHODS
- // ***********************************************************************
-
- private var fileOutput = false
- private var edgesInputPath: String = null
- private var outputPath: String = null
- private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
-
- private def parseParameters(args: Array[String]): Boolean = {
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>")
- false
- }
- fileOutput = true
- edgesInputPath = args(0)
- outputPath = args(1)
- maxIterations = (2).toInt
- } else {
- System.out.println("Executing ConnectedComponents example with default parameters" +
- " and built-in default data.")
- System.out.println(" Provide parameters to read input data from files.")
- System.out.println(" See the documentation for the correct format of input files.")
- System.out.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>");
- }
- true
- }
-
- private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
- if (fileOutput) {
- env.readCsvFile[(Long, Long)](edgesInputPath,
- lineDelimiter = "\n",
- fieldDelimiter = "\t")
- .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
- } else {
- val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
- case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
- }
- env.fromCollection(edgeData).map(
- edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
deleted file mode 100644
index 2dc272c..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-import org.apache.flink.graph.gsa.GatherFunction
-import org.apache.flink.graph.gsa.Neighbor
-import org.apache.flink.graph.gsa.SumFunction
-import org.apache.flink.graph.gsa.ApplyFunction
-
-/**
- * This example shows how to use Gelly's gather-sum-apply iterations.
- *
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object GSASingleSourceShortestPaths {
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
- val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
- // Execute the gather-sum-apply iteration
- val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
- new UpdateDistance, maxIterations)
-
- // Extract the vertices as the result
- val singleSourceShortestPaths = result.getVertices
-
- // emit result
- if (fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
- env.execute("GSA Single Source Shortest Paths Example")
- } else {
- singleSourceShortestPaths.print()
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Single Source Shortest Path UDFs
- // --------------------------------------------------------------------------------------------
-
- private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
- override def map(id: Long) = {
- if (id.equals(srcId)) {
- 0.0
- } else {
- Double.PositiveInfinity
- }
- }
- }
-
- private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
- override def gather(neighbor: Neighbor[Double, Double]) = {
- neighbor.getNeighborValue + neighbor.getEdgeValue
- }
- }
-
- private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
- override def sum(newValue: Double, currentValue: Double) = {
- Math.min(newValue, currentValue)
- }
- }
-
- private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
- override def apply(newDistance: Double, oldDistance: Double) = {
- if (newDistance < oldDistance) {
- setResult(newDistance)
- }
- }
- }
-
- // **************************************************************************
- // UTIL METHODS
- // **************************************************************************
-
- private var fileOutput = false
- private var srcVertexId = 1L
- private var edgesInputPath: String = null
- private var outputPath: String = null
- private var maxIterations = 5
-
- private def parseParameters(args: Array[String]): Boolean = {
- if(args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>")
- false
- }
- fileOutput = true
- srcVertexId = args(0).toLong
- edgesInputPath = args(1)
- outputPath = args(2)
- maxIterations = (3).toInt
- } else {
- System.out.println("Executing Single Source Shortest Paths example "
- + "with default parameters and built-in default data.")
- System.out.println(" Provide parameters to read input data from files.")
- System.out.println(" See the documentation for the correct format of input files.")
- System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- }
- true
- }
-
- private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
- if (fileOutput) {
- env.readCsvFile[(Long, Long, Double)](edgesInputPath,
- lineDelimiter = "\n",
- fieldDelimiter = "\t")
- .map(new Tuple3ToEdgeMap[Long, Double]())
- } else {
- val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
- case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
- z.asInstanceOf[Double])
- }
- env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
deleted file mode 100644
index 4eed824..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.
- *
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- * <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-object GraphMetrics {
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- /** create the graph **/
- val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
-
- /** get the number of vertices **/
- val numVertices = graph.numberOfVertices;
-
- /** get the number of edges **/
- val numEdges = graph.numberOfEdges;
-
- /** compute the average node degree **/
- val verticesWithDegrees = graph.getDegrees;
- val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
-
- /** find the vertex with the maximum in-degree **/
- val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
- /** find the vertex with the minimum in-degree **/
- val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
- /** find the vertex with the maximum out-degree **/
- val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
- /** find the vertex with the minimum out-degree **/
- val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
- /** print the results **/
- env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
- env.fromElements(numEdges).printOnTaskManager("Total number of edges")
- avgDegree.printOnTaskManager("Average node degree")
- maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
- minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
- maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
- minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
- }
-
- private def parseParameters(args: Array[String]): Boolean = {
- if (args.length > 0) {
- fileOutput = true
- if (args.length == 1) {
- edgesPath = args(0)
- true
- } else {
- System.err.println("Usage: GraphMetrics <edges path>")
- false
- }
- } else {
- System.out.println("Executing GraphMetrics example with built-in default data.")
- System.out.println(" Provide parameters to read input data from a file.")
- System.out.println(" Usage: GraphMetrics <edges path>")
- true
- }
- }
-
- private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
- if (fileOutput) {
- env.readCsvFile[(Long, Long)](
- edgesPath,
- fieldDelimiter = "\t").map(
- in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
- } else {
- env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
- (key: Long, out: Collector[Edge[Long, NullValue]]) => {
- val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
- for ( i <- 0 to numOutEdges ) {
- var target: Long = ((Math.random() * numVertices) + 1).toLong
- new Edge[Long, NullValue](key, target, NullValue.getInstance())
- }
- })
- }
- }
-
- private var fileOutput: Boolean = false
- private var edgesPath: String = null
- private var outputPath: String = null
- private val numVertices = 100
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
deleted file mode 100644
index 65a8e7f..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
-import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-
-/**
- * This example shows how to use Gelly's vertex-centric iterations.
- *
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object SingleSourceShortestPaths {
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
- val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
- // Execute the vertex-centric iteration
- val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
- new MinDistanceMessenger, maxIterations)
-
- // Extract the vertices as the result
- val singleSourceShortestPaths = result.getVertices
-
- // emit result
- if (fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
- env.execute("Single Source Shortest Paths Example")
- } else {
- singleSourceShortestPaths.print()
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Single Source Shortest Path UDFs
- // --------------------------------------------------------------------------------------------
-
- private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
- override def map(id: Long) = {
- if (id.equals(srcId)) {
- 0.0
- } else {
- Double.PositiveInfinity
- }
- }
- }
-
- /**
- * Function that updates the value of a vertex by picking the minimum
- * distance from all incoming messages.
- */
- private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
-
- override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
- var minDistance = Double.MaxValue
- while (inMessages.hasNext) {
- var msg = inMessages.next
- if (msg < minDistance) {
- minDistance = msg
- }
- }
- if (vertex.getValue > minDistance) {
- setNewVertexValue(minDistance)
- }
- }
- }
-
- /**
- * Distributes the minimum distance associated with a given vertex among all
- * the target vertices summed up with the edge's value.
- */
- private final class MinDistanceMessenger extends
- MessagingFunction[Long, Double, Double, Double] {
-
- override def sendMessages(vertex: Vertex[Long, Double]) {
- for (edge: Edge[Long, Double] <- getEdges) {
- sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
- }
- }
- }
-
- // ****************************************************************************
- // UTIL METHODS
- // ****************************************************************************
-
- private var fileOutput = false
- private var srcVertexId = 1L
- private var edgesInputPath: String = null
- private var outputPath: String = null
- private var maxIterations = 5
-
- private def parseParameters(args: Array[String]): Boolean = {
- if(args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>")
- false
- }
- fileOutput = true
- srcVertexId = args(0).toLong
- edgesInputPath = args(1)
- outputPath = args(2)
- maxIterations = (3).toInt
- } else {
- System.out.println("Executing Single Source Shortest Paths example "
- + "with default parameters and built-in default data.")
- System.out.println(" Provide parameters to read input data from files.")
- System.out.println(" See the documentation for the correct format of input files.")
- System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- }
- true
- }
-
- private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
- if (fileOutput) {
- env.readCsvFile[(Long, Long, Double)](edgesInputPath,
- lineDelimiter = "\n",
- fieldDelimiter = "\t")
- .map(new Tuple3ToEdgeMap[Long, Double]())
- } else {
- val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
- case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
- z.asInstanceOf[Double])
- }
- env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
deleted file mode 100644
index 159a100..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.graph.{Graph => JGraph}
-
-import _root_.scala.reflect.ClassTag
-
-
-package object scala {
- private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
- EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
deleted file mode 100644
index 909dbb4..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Edge
-
-@SerialVersionUID(1L)
-class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
-
- override def map(value: Edge[K, EV]): (K, K, EV) = {
- (value.getSource, value.getTarget, value.getValue)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
deleted file mode 100644
index fd6b8c5..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Vertex
-
-@SerialVersionUID(1L)
-class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
-
- override def map(value: (K, VV)): Vertex[K, VV] = {
- new Vertex(value._1, value._2)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
deleted file mode 100644
index d0e07cc..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Edge
-
-@SerialVersionUID(1L)
-class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
-
- override def map(value: (K, K, EV)): Edge[K, EV] = {
- new Edge(value._1, value._2, value._3)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
deleted file mode 100644
index faf4e10..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Vertex
-
-@SerialVersionUID(1L)
-class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
-
- override def map(value: Vertex[K, VV]): (K, VV) = {
- (value.getId, value.getValue)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
deleted file mode 100644
index 55faee3..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.lang.reflect.Method
-import org.apache.flink.graph.scala._
-import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.graph.{Graph => JavaGraph}
-import scala.language.existentials
-import org.junit.Test
-
-/**
- * This checks whether the Gelly Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for Gelly.
- */
-class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
-
- override def isExcludedByName(method: Method): Boolean = {
- val name = method.getDeclaringClass.getName + "." + method.getName
- val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
- excludedNames.contains(name)
- }
-
- @Test
- override def testCompleteness(): Unit = {
- checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
deleted file mode 100644
index 1c2cf54..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.{Edge, Vertex}
-
-object TestGraphUtils {
-
- def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
- return env.fromCollection(getLongLongVertices)
- }
-
- def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
- return env.fromCollection(getLongLongEdges)
- }
-
- def getLongLongVertices: List[Vertex[Long, Long]] = {
- List(
- new Vertex[Long, Long](1L, 1L),
- new Vertex[Long, Long](2L, 2L),
- new Vertex[Long, Long](3L, 3L),
- new Vertex[Long, Long](4L, 4L),
- new Vertex[Long, Long](5L, 5L)
- )
- }
-
- def getLongLongEdges: List[Edge[Long, Long]] = {
- List(
- new Edge[Long, Long](1L, 2L, 12L),
- new Edge[Long, Long](1L, 3L, 13L),
- new Edge[Long, Long](2L, 3L, 23L),
- new Edge[Long, Long](3L, 4L, 34L),
- new Edge[Long, Long](3L, 5L, 35L),
- new Edge[Long, Long](4L, 5L, 45L),
- new Edge[Long, Long](5L, 1L, 51L)
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
deleted file mode 100644
index b347049..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
- private var expectedResult: String = null
-
- @Test
- @throws(classOf[Exception])
- def testInDegrees {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.inDegrees.collect().toList
- expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testOutDegrees {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.outDegrees.collect().toList
- expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testGetDegrees {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getDegrees.collect().toList
- expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
deleted file mode 100644
index 6ceaf16..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-import java.io.IOException
-import org.apache.flink.core.fs.FileInputSplit
-import java.io.File
-import java.io.OutputStreamWriter
-import java.io.FileOutputStream
-import java.io.FileOutputStream
-import com.google.common.base.Charsets
-import org.apache.flink.core.fs.Path
-import org.apache.flink.types.NullValue
-import org.apache.flink.api.common.functions.MapFunction
-
-@RunWith(classOf[Parameterized])
-class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
- private var expectedResult: String = null
-
- @Test
- @throws(classOf[Exception])
- def testCsvWithValues {
- /*
- * Test with two Csv files, both vertices and edges have values
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val verticesContent = "1,1\n2,2\n3,3\n"
- val verticesSplit = createTempFile(verticesContent)
- val edgesContent = "1,2,ot\n3,2,tt\n3,1,to\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, Long, String](
- readVertices = true,
- pathVertices = verticesSplit.getPath.toString,
- pathEdges = edgesSplit.getPath.toString,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvNoEdgeValues {
- /*
- * Test with two Csv files; edges have no values
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val verticesContent = "1,one\n2,two\n3,three\n"
- val verticesSplit = createTempFile(verticesContent)
- val edgesContent = "1,2\n3,2\n3,1\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, String, NullValue](
- readVertices = true,
- pathVertices = verticesSplit.getPath.toString,
- pathEdges = edgesSplit.getPath.toString,
- hasEdgeValues = false,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvWithMapperValues {
- /*
- * Test with edges Csv file and vertex mapper initializer
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edgesContent = "1,2,12\n3,2,32\n3,1,31\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, Double, Long](
- readVertices = false,
- pathEdges = edgesSplit.getPath.toString,
- mapper = new VertexDoubleIdAssigner(),
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvNoVertexValues {
- /*
- * Test with edges Csv file: no vertex values
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edgesContent = "1,2,12\n3,2,32\n3,1,31\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, NullValue, Long](
- readVertices = false,
- pathEdges = edgesSplit.getPath.toString,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
- "3,1,(null),(null),31\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvNoValues {
- /*
- * Test with edges Csv file: neither vertex nor edge values
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val edgesContent = "1,2\n3,2\n3,1\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, NullValue, NullValue](
- readVertices = false,
- pathEdges = edgesSplit.getPath.toString,
- hasEdgeValues = false,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,(null),(null),(null)\n" +
- "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvOptionsVertices {
- /*
- * Test the options for vertices: delimiters, comments, ignore first line.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val verticesContent = "42#42\t" + "%this-is-a-comment\t" +
- "1#1\t" + "2#2\t" + "3#3\t"
- val verticesSplit = createTempFile(verticesContent)
- val edgesContent = "1,2,ot\n3,2,tt\n3,1,to\n"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, Long, String](
- readVertices = true,
- pathVertices = verticesSplit.getPath.toString,
- lineDelimiterVertices = "\t",
- fieldDelimiterVertices = "#",
- ignoreFirstLineVertices = true,
- ignoreCommentsVertices = "%",
- pathEdges = edgesSplit.getPath.toString,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @Test
- @throws(classOf[Exception])
- def testCsvOptionsEdges {
- /*
- * Test the options for edges: delimiters, comments, ignore first line.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val verticesContent = "1,1\n2,2\n3,3\n"
- val verticesSplit = createTempFile(verticesContent)
- val edgesContent = "42#42#ignore&" + "1#2#ot&" + "3#2#tt&" + "3#1#to&" +
- "//this-is-a-comment"
- val edgesSplit = createTempFile(edgesContent)
- val graph = Graph.fromCsvReader[Long, Long, String](
- pathVertices = verticesSplit.getPath.toString,
- readVertices = true,
- lineDelimiterEdges = "&",
- fieldDelimiterEdges = "#",
- ignoreFirstLineEdges = true,
- ignoreCommentsEdges = "//",
- pathEdges = edgesSplit.getPath.toString,
- env = env)
-
- val result = graph.getTriplets.collect()
- expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
- TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
- }
-
- @throws(classOf[IOException])
- def createTempFile(content: String): FileInputSplit = {
- val tempFile = File.createTempFile("test_contents", "tmp")
- tempFile.deleteOnExit()
-
- val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), Charsets.UTF_8)
- wrt.write(content)
- wrt.close()
-
- new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
- Array("localhost"));
- }
-
- final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
- @throws(classOf[Exception])
- def map(id: Long): Double = {id.toDouble}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
deleted file mode 100644
index 4b776e2..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
- private var expectedResult: String = null
-
- @Test
- @throws(classOf[Exception])
- def testAddVertex {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
- val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
- val res = newgraph.getVertices.collect().toList
- expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddVertexExisting {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
- val res = newgraph.getVertices.collect().toList
- expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddVertexNoEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
- val res = newgraph.getVertices.collect().toList
- expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
- val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
- new Vertex[Long, Long](7L, 7L)))
- val res = newgraph.getVertices.collect().toList
- expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddVerticesExisting {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
- val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
- new Vertex[Long, Long](6L, 6L)))
- val res = newgraph.getVertices.collect().toList
- expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveVertex {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveInvalidVertex {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
- "45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
- new Vertex[Long, Long](2L, 2L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveValidAndInvalidVertex {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
- new Vertex[Long, Long](6L, 6L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddEdge {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
- 1L), 61L)
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
- "45\n" + "5,1,51\n" + "6,1,61\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
- new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
- "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddEdgesInvalidVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
- new Edge(7L, 8L, 78L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
- "35\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testAddExistingEdge {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
- 2L), 12L)
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
- "35\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveEdge {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveInvalidEdge {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
- "45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
- new Edge(4L, 5L, 45L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testRemoveSameEdgeTwiceEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
- new Edge(1L, 2L, 12L)))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
deleted file mode 100644
index 7f7ebc0..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
- private var expectedResult: String = null
-
- @Test
- @throws(classOf[Exception])
- def testUndirected {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getUndirected.getEdges.collect().toList;
-
- expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
- "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
- "5,1,51\n" + "1,5,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testReverse {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.reverse().getEdges.collect().toList;
-
- expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
- "45\n" + "1,5,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testSubGraph {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
- @throws(classOf[Exception])
- def filter(vertex: Vertex[Long, Long]): Boolean = {
- return (vertex.getValue > 2)
- }
- }, new FilterFunction[Edge[Long, Long]] {
-
- @throws(classOf[Exception])
- override def filter(edge: Edge[Long, Long]): Boolean = {
- return (edge.getValue > 34)
- }
- }).getEdges.collect().toList;
-
- expectedResult = "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testSubGraphSugar {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.subgraph(
- vertex => vertex.getValue > 2,
- edge => edge.getValue > 34
- ).getEdges.collect().toList;
-
- expectedResult = "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testFilterOnVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
- @throws(classOf[Exception])
- def filter(vertex: Vertex[Long, Long]): Boolean = {
- vertex.getValue > 2
- }
- }).getEdges.collect().toList;
-
- expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testFilterOnVerticesSugar {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.filterOnVertices(
- vertex => vertex.getValue > 2
- ).getEdges.collect().toList;
-
- expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testFilterOnEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
- @throws(classOf[Exception])
- def filter(edge: Edge[Long, Long]): Boolean = {
- edge.getValue > 34
- }
- }).getEdges.collect().toList;
-
- expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testFilterOnEdgesSugar {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.filterOnEdges(
- edge => edge.getValue > 34
- ).getEdges.collect().toList;
-
- expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testNumberOfVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = env.fromElements(graph.numberOfVertices).collect().toList
- expectedResult = "5"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testNumberOfEdges {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = env.fromElements(graph.numberOfEdges).collect().toList
- expectedResult = "7"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testVertexIds {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getVertexIds.collect().toList
- expectedResult = "1\n2\n3\n4\n5\n"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testEdgesIds {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getEdgeIds.collect().toList
- expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
- "(5,1)\n"
- TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testUnion {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
- new Vertex[Long, Long](6L, 6L)
- )
- val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
- new Edge[Long, Long](6L, 1L, 61L)
- )
-
- val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
- "45\n" + "5,1,51\n" + "6,1,61\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testDifference {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
- new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
- new Vertex[Long, Long](6L, 6L)
- )
- val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
- new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
- new Edge[Long, Long](6L, 3L, 63L)
- )
-
- val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "4,5,45\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testDifferenceNoCommonVertices {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
- new Vertex[Long, Long](6L, 6L)
- )
- val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
- new Edge[Long, Long](6L, 6L, 66L)
- )
-
- val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
- val res = newgraph.getEdges.collect().toList
- expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
- "45\n" + "5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-
- @Test
- @throws(classOf[Exception])
- def testTriplets {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
- .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getTriplets.collect().toList
- expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
- "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
- TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
- }
-}