You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:11 UTC
[29/60] git commit: Added TriangleEnumeration examples for reworked
Scala API.
Added TriangleEnumeration examples for reworked Scala API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd9e27e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd9e27e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd9e27e9
Branch: refs/heads/master
Commit: dd9e27e98ca4428187e235e7f335ec0393dcbb8a
Parents: e0f2440
Author: Fabian Hueske <fh...@apache.org>
Authored: Sat Sep 6 00:59:44 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200
----------------------------------------------------------------------
.../java/graph/util/EnumTrianglesData.java | 34 ++-
.../scala/graph/EnumTrianglesBasic.scala | 179 ++++++++++++++
.../graph/EnumTrianglesOnEdgesWithDegrees.scala | 125 ----------
.../examples/scala/graph/EnumTrianglesOpt.scala | 235 +++++++++++++++++++
4 files changed, 437 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
index 87d998b..02c0531 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
@@ -19,6 +19,9 @@
package org.apache.flink.example.java.graph.util;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
@@ -30,18 +33,27 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
*/
public class EnumTrianglesData {
+ public static final Object[][] EDGES = {
+ {1, 2},
+ {1, 3},
+ {1 ,4},
+ {1, 5},
+ {2, 3},
+ {2, 5},
+ {3, 4},
+ {3, 7},
+ {3, 8},
+ {5, 6},
+ {7, 8}
+ };
+
public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
- return env.fromElements(new Edge(1, 2),
- new Edge(1, 3),
- new Edge(1, 4),
- new Edge(1, 5),
- new Edge(2, 3),
- new Edge(2, 5),
- new Edge(3, 4),
- new Edge(3, 7),
- new Edge(3, 8),
- new Edge(5, 6),
- new Edge(7, 8));
+ List<Edge> edges = new ArrayList<Edge>();
+ for(Object[] e : EDGES) {
+ edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+ }
+
+ return env.fromCollection(edges);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
new file mode 100644
index 0000000..8672b2c
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.example.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+import scala.collection.mutable.MutableList
+
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ *
+ * <p>
+ * The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ * (1)
+ * / \
+ * (2)-(12)
+ * </pre>
+ *
+ * Usage:
+ * {{{
+ * EnumTriangleBasic <edge path> <result path>
+ * }}}
+ * <br>
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.example.java.graph.util.EnumTrianglesData]]
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ *
+ */
+object EnumTrianglesBasic {
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // read input data
+ val edges = getEdgeDataSet(env)
+
+ // project edges by vertex id
+ val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+
+ val triangles = edgesById
+ // build triads
+ .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new TriadBuilder())
+ // filter triads
+ .join(edgesById).where(1,2).equalTo(0,1) { (t, _) => Some(t) }
+
+ // emit result
+ if (fileOutput) {
+ triangles.writeAsCsv(outputPath, "\n", " ")
+ } else {
+ triangles.print()
+ }
+
+ // execute program
+ env.execute("TriangleEnumeration Example")
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Edge(v1: Int, v2: Int) extends Serializable
+ case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+ */
+ class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+ val vertices = MutableList[Integer]()
+
+ override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+
+ // clear vertex list
+ vertices.clear
+
+ // build and emit triads
+ for(e <- edges.asScala) {
+
+ // combine vertex with all previously read vertices
+ for(v <- vertices) {
+ out.collect(Triad(e.v1, v, e.v2))
+ }
+ vertices += e.v2
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 2) {
+ edgePath = args(0)
+ outputPath = args(1)
+ } else {
+ System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+ false
+ }
+ } else {
+ System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
+ }
+ true
+ }
+
+ private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+ if (fileOutput) {
+ env.readCsvFile[(Int, Int)](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)).
+ map { x => new Edge(x._1, x._2) }
+ } else {
+ val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) }
+ env.fromCollection(edges)
+ }
+ }
+
+
+ private var fileOutput: Boolean = false
+ private var edgePath: String = null
+ private var outputPath: String = null
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
deleted file mode 100644
index cf943be..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.graph
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import scala.math._
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.apache.flink.api.scala.analysis.GlobalSchemaPrinter
-//
-//object RunEnumTrianglesOnEdgesWithDegrees {
-// def main(args: Array[String]) {
-// val enumTriangles = new EnumTrianglesOnEdgesWithDegrees
-// if (args.size < 3) {
-// println(enumTriangles.getDescription)
-// return
-// }
-// val plan = enumTriangles.getScalaPlan(args(0).toInt, args(1), args(2))
-// LocalExecutor.execute(plan)
-// }
-//}
-//
-///**
-// * Enumerates all triangles build by three connected vertices in a graph.
-// * The graph is represented as edges (pairs of vertices) with annotated vertex degrees. *
-// */
-//class EnumTrianglesOnEdgesWithDegrees extends Program with ProgramDescription with Serializable {
-// override def getDescription() = {
-// "Parameters: [numSubStasks] [input file] [output file]"
-// }
-// override def getPlan(args: String*) = {
-// getScalaPlan(args(0).toInt, args(1), args(2))
-// }
-//
-// /*
-// * Output formatting function for triangles.
-// */
-// def formatTriangle = (v1: Int, v2: Int, v3: Int) => "%d,%d,%d".format(v1, v2, v3)
-//
-// /*
-// * Extracts degree information and projects edges such that lower degree vertex comes first.
-// */
-// def projectVertexesWithDegrees(e: (String, String)): (Int, Int) = {
-// val v1 = e._1.split(",")
-// val v2 = e._2.split(",")
-// if (v1(1).toInt <= v2(1).toInt)
-// (v1(0).toInt, v2(0).toInt)
-// else
-// (v2(0).toInt, v1(0).toInt)
-// }
-//
-// /*
-// * Joins projected edges on lower vertex id.
-// * Emits a triad (triangle candidate with one missing edge) for each unique combination of edges.
-// * Ensures that vertex 2 and 3 are ordered by vertex id.
-// */
-// def buildTriads(eI : Iterator[(Int, Int)]): List[(Int, Int, Int)] = {
-// val eL = eI.toList
-// for (e1 <- eL;
-// e2 <- eL
-// if e1._2 < e2._2) yield
-// (e1._1, e1._2, e2._2)
-// }
-//
-// def getScalaPlan(numSubTasks: Int, edgeInput: String, triangleOutput: String) = {
-//
-// /*
-// * Input format for edges with degrees
-// * Edges are separated by new line '\n'.
-// * An edge is represented by two vertex IDs with associated vertex degrees.
-// * The format of an edge is "<vertexID1>,<vertexDegree1>|<vertexID2>,<vertexDegree2>"
-// */
-// val vertexesWithDegrees = DataSource(edgeInput, CsvInputFormat[(String, String)]("\n", '|'))
-//
-// /*
-// * Project edges such that vertex with lower degree comes first (record position 1) and remove the degrees.
-// */
-// val edgesByDegree = vertexesWithDegrees map { projectVertexesWithDegrees }
-//
-// /*
-// * Project edges such that vertex with lower ID comes first (record position) and remove degrees.
-// */
-// val edgesByID = edgesByDegree map { (x) => if (x._1 < x._2) (x._1, x._2) else (x._2, x._1) }
-//
-// /*
-// * Build triads by joining edges on common vertex.
-// */
-// val triads = edgesByDegree groupBy { _._1 } reduceGroup { buildTriads } flatMap {x => x.iterator }
-//
-// /*
-// * Join triads with projected edges to 'close' triads.
-// * This filters triads without a closing edge.
-// */
-// val triangles = triads join edgesByID where { t => (t._2, t._3) } isEqualTo { e => (e._1, e._2) } map { (t, e) => t }
-//
-// /*
-// * Emit triangles
-// */
-// val output = triangles.write(triangleOutput, DelimitedOutputFormat(formatTriangle.tupled))
-//
-// val plan = new ScalaPlan(Seq(output), "Enumerate Triangles on Edges with Degrees")
-// plan.setDefaultParallelism(numSubTasks)
-// plan
-// }
-//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
new file mode 100644
index 0000000..65c8b3e
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.example.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+import scala.collection.mutable.MutableList
+
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ *
+ * <p>
+ * The basic algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ *
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
+ * reduce the number of triads.
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and
+ * grouping on edges on the vertex with the smaller degree.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ * (1)
+ * / \
+ * (2)-(12)
+ * </pre>
+ *
+ * Usage: <code>EnumTriangleOpt <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ *
+ */
+object EnumTrianglesOpt {
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // read input data
+ val edges = getEdgeDataSet(env)
+
+ val edgesWithDegrees = edges
+ // duplicate and switch edges
+ .flatMap( e => Array(e, Edge(e.v2, e.v1)) )
+ // add degree of first vertex
+ .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter())
+ // join degrees of vertices
+ .groupBy(0,2).reduce( (e1, e2) => if(e1.d2 == 0)
+ new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2)
+ else
+ new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2)
+ )
+
+ // project edges by degrees, vertex with smaller degree comes first
+ val edgesByDegree = edgesWithDegrees
+ .map(e => if (e.d1 < e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1) )
+ // project edges by Id, vertex with smaller Id comes first
+ val edgesById = edgesByDegree
+ .map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+
+ val triangles = edgesByDegree
+ // build triads
+ .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new TriadBuilder())
+ // filter triads
+ .join(edgesById).where(1,2).equalTo(0,1) { (t, _) => Some(t) }
+
+ // emit result
+ if (fileOutput) {
+ triangles.writeAsCsv(outputPath, "\n", " ")
+ } else {
+ triangles.print()
+ }
+
+ // execute program
+ env.execute("TriangleEnumeration Example")
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Edge(v1: Int, v2: Int) extends Serializable
+ case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+ case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable
+
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Counts the number of edges that share a common vertex.
+ * Emits one edge for each input edge with a degree annotation for the shared vertex.
+ * For each emitted edge, the first vertex is the vertex with the smaller id.
+ */
+ class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] {
+
+ val vertices = MutableList[Integer]()
+ var groupVertex = 0
+
+ override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = {
+
+ // empty vertex list
+ vertices.clear
+
+ // collect all vertices
+ for(e <- edges.asScala) {
+ groupVertex = e.v1
+ if(!vertices.contains(e.v2) && e.v1 != e.v2) {
+ vertices += e.v2
+ }
+ }
+
+ // count vertices to obtain degree of groupVertex
+ val degree = vertices.length
+
+ // create and emit edges with degrees
+ for(v <- vertices) {
+ if (v < groupVertex) {
+ out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree))
+ } else {
+ out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0))
+ }
+ }
+ }
+ }
+
+ /**
+ * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+ */
+ class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+ val vertices = MutableList[Integer]()
+
+ override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+
+ // clear vertex list
+ vertices.clear
+
+ // build and emit triads
+ for(e <- edges.asScala) {
+
+ // combine vertex with all previously read vertices
+ for(v <- vertices) {
+ out.collect(Triad(e.v1, v, e.v2))
+ }
+ vertices += e.v2
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 2) {
+ edgePath = args(0)
+ outputPath = args(1)
+ } else {
+ System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+ false
+ }
+ } else {
+ System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
+ }
+ true
+ }
+
+ private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+ if (fileOutput) {
+ env.readCsvFile[(Int, Int)](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)).
+ map { x => new Edge(x._1, x._2) }
+ } else {
+ val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) }
+ env.fromCollection(edges)
+ }
+ }
+
+
+ private var fileOutput: Boolean = false
+ private var edgePath: String = null
+ private var outputPath: String = null
+
+}
\ No newline at end of file