You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:11 UTC

[29/60] git commit: Added TriangleEnumeration examples for reworked Scala API.

Added TriangleEnumeration examples for reworked Scala API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd9e27e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd9e27e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd9e27e9

Branch: refs/heads/master
Commit: dd9e27e98ca4428187e235e7f335ec0393dcbb8a
Parents: e0f2440
Author: Fabian Hueske <fh...@apache.org>
Authored: Sat Sep 6 00:59:44 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200

----------------------------------------------------------------------
 .../java/graph/util/EnumTrianglesData.java      |  34 ++-
 .../scala/graph/EnumTrianglesBasic.scala        | 179 ++++++++++++++
 .../graph/EnumTrianglesOnEdgesWithDegrees.scala | 125 ----------
 .../examples/scala/graph/EnumTrianglesOpt.scala | 235 +++++++++++++++++++
 4 files changed, 437 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
index 87d998b..02c0531 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/EnumTrianglesData.java
@@ -19,6 +19,9 @@
 
 package org.apache.flink.example.java.graph.util;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
@@ -30,18 +33,27 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
  */
 public class EnumTrianglesData {
 
+	public static final Object[][] EDGES = {
+		{1, 2},
+		{1, 3},
+		{1 ,4},
+		{1, 5},
+		{2, 3},
+		{2, 5},
+		{3, 4},
+		{3, 7},
+		{3, 8},
+		{5, 6},
+		{7, 8}
+	};
+	
 	public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
 		
-		return env.fromElements(new Edge(1, 2),
-								new Edge(1, 3),
-								new Edge(1, 4),
-								new Edge(1, 5),
-								new Edge(2, 3),
-								new Edge(2, 5),
-								new Edge(3, 4),
-								new Edge(3, 7),
-								new Edge(3, 8),
-								new Edge(5, 6),
-								new Edge(7, 8));
+		List<Edge> edges = new ArrayList<Edge>();
+		for(Object[] e : EDGES) {
+			edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+		}
+		
+		return env.fromCollection(edges);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
new file mode 100644
index 0000000..8672b2c
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.example.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+import scala.collection.mutable.MutableList
+
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * <p>
+ * The algorithm works as follows: 
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *  
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: 
+ * {{{
+ * EnumTriangleBasic <edge path> <result path>
+ * }}}
+ * <br>
+ * If no parameters are provided, the program is run with default data from 
+ * [[org.apache.flink.example.java.graph.util.EnumTrianglesData]]
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ * 
+ */
+object EnumTrianglesBasic {
+	
+	def main(args: Array[String]) {
+		if (!parseParameters(args)) {
+			return
+		}
+
+		// set up execution environment
+		val env = ExecutionEnvironment.getExecutionEnvironment
+
+		// read input data
+		val edges = getEdgeDataSet(env)
+		
+		// project edges by vertex id
+		val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+		
+		val triangles = edgesById
+						// build triads
+						.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new TriadBuilder())
+						// filter triads
+						.join(edgesById).where(1,2).equalTo(0,1) { (t, _) => Some(t) }
+		
+		// emit result
+		if (fileOutput) {
+			triangles.writeAsCsv(outputPath, "\n", " ")
+		} else {
+			triangles.print()
+		}
+		
+		// execute program
+		env.execute("TriangleEnumeration Example")
+	}
+
+	// *************************************************************************
+	//     USER DATA TYPES
+	// *************************************************************************
+
+	case class Edge(v1: Int, v2: Int) extends Serializable
+	case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+	
+		
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+		val vertices = MutableList[Integer]()
+		
+		override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+			
+			// clear vertex list
+			vertices.clear
+
+			// build and emit triads
+			for(e <- edges.asScala) {
+			
+				// combine vertex with all previously read vertices
+				for(v <- vertices) {
+					out.collect(Triad(e.v1, v, e.v2))
+				}
+				vertices += e.v2
+			}
+		}
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private def parseParameters(args: Array[String]): Boolean = {
+		if (args.length > 0) {
+			fileOutput = true
+			if (args.length == 2) {
+				edgePath = args(0)
+				outputPath = args(1)
+			} else {
+				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+				false
+			}
+		} else {
+			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
+		}
+		true
+	}
+
+	private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+		if (fileOutput) {
+			env.readCsvFile[(Int, Int)](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)).
+			map { x => new Edge(x._1, x._2) }
+		} else {
+			val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) }
+			env.fromCollection(edges)
+		}
+	}
+	
+	
+	private var fileOutput: Boolean = false
+	private var edgePath: String = null
+	private var outputPath: String = null
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
deleted file mode 100644
index cf943be..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOnEdgesWithDegrees.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.graph
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import scala.math._
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.apache.flink.api.scala.analysis.GlobalSchemaPrinter
-//
-//object RunEnumTrianglesOnEdgesWithDegrees {
-//  def main(args: Array[String]) {
-//    val enumTriangles = new EnumTrianglesOnEdgesWithDegrees
-//    if (args.size < 3) {
-//      println(enumTriangles.getDescription)
-//      return
-//    }
-//    val plan = enumTriangles.getScalaPlan(args(0).toInt, args(1), args(2))
-//    LocalExecutor.execute(plan)
-//  }
-//}
-//
-///**
-// * Enumerates all triangles build by three connected vertices in a graph.
-// * The graph is represented as edges (pairs of vertices) with annotated vertex degrees. *
-// */
-//class EnumTrianglesOnEdgesWithDegrees extends Program with ProgramDescription with Serializable {
-//  override def getDescription() = {
-//    "Parameters: [numSubStasks] [input file] [output file]"
-//  }
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(0).toInt, args(1), args(2))
-//  }
-//
-//  /*
-//   * Output formatting function for triangles.
-//   */
-//  def formatTriangle = (v1: Int, v2: Int, v3: Int) => "%d,%d,%d".format(v1, v2, v3)
-//
-//  /*
-//   * Extracts degree information and projects edges such that lower degree vertex comes first.
-//   */
-//  def projectVertexesWithDegrees(e: (String, String)): (Int, Int) = {
-//    val v1 = e._1.split(",")
-//    val v2 = e._2.split(",")
-//    if (v1(1).toInt <= v2(1).toInt)
-//      (v1(0).toInt, v2(0).toInt)
-//    else
-//      (v2(0).toInt, v1(0).toInt)
-//  }
-//
-//  /*
-//   * Joins projected edges on lower vertex id.
-//   * Emits a triad (triangle candidate with one missing edge) for each unique combination of edges.
-//   * Ensures that vertex 2 and 3 are ordered by vertex id.
-//   */
-//  def buildTriads(eI : Iterator[(Int, Int)]): List[(Int, Int, Int)] = {
-//    val eL = eI.toList
-//    for (e1 <- eL;
-//         e2 <- eL
-//         if e1._2 < e2._2) yield
-//      (e1._1, e1._2, e2._2)
-//  }
-//
-//  def getScalaPlan(numSubTasks: Int, edgeInput: String, triangleOutput: String) = {
-//
-//    /*
-//     * Input format for edges with degrees
-//     * Edges are separated by new line '\n'.
-//     * An edge is represented by two vertex IDs with associated vertex degrees.
-//     * The format of an edge is "<vertexID1>,<vertexDegree1>|<vertexID2>,<vertexDegree2>"
-//     */
-//    val vertexesWithDegrees = DataSource(edgeInput, CsvInputFormat[(String, String)]("\n", '|'))
-//
-//    /*
-//     * Project edges such that vertex with lower degree comes first (record position 1) and remove the degrees.
-//     */
-//    val edgesByDegree = vertexesWithDegrees map { projectVertexesWithDegrees }
-//
-//    /*
-//     * Project edges such that vertex with lower ID comes first (record position) and remove degrees.
-//     */
-//    val edgesByID = edgesByDegree map { (x) => if (x._1 < x._2) (x._1, x._2) else (x._2, x._1) }
-//
-//    /*
-//     * Build triads by joining edges on common vertex.
-//     */
-//    val triads = edgesByDegree groupBy { _._1 } reduceGroup { buildTriads } flatMap {x => x.iterator }
-//
-//    /*
-//     * Join triads with projected edges to 'close' triads.
-//     * This filters triads without a closing edge.
-//     */
-//    val triangles = triads join edgesByID where { t => (t._2, t._3) } isEqualTo { e => (e._1, e._2) } map { (t, e) => t }
-//
-//    /*
-//     * Emit triangles
-//     */
-//    val output = triangles.write(triangleOutput, DelimitedOutputFormat(formatTriangle.tupled))
-//
-//    val plan = new ScalaPlan(Seq(output), "Enumerate Triangles on Edges with Degrees")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9e27e9/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
new file mode 100644
index 0000000..65c8b3e
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.example.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+import scala.collection.mutable.MutableList
+
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * <p>
+ * The basic algorithm works as follows: 
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ * 
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to 
+ * reduce the number of triads. 
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and 
+ * grouping on edges on the vertex with the smaller degree.
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: <code>EnumTriangleOpt &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ * 
+ */
+object EnumTrianglesOpt {
+	
+	def main(args: Array[String]) {
+		if (!parseParameters(args)) {
+			return
+		}
+
+		// set up execution environment
+		val env = ExecutionEnvironment.getExecutionEnvironment
+
+		// read input data
+		val edges = getEdgeDataSet(env)
+		
+		val edgesWithDegrees = edges
+								// duplicate and switch edges
+								.flatMap( e => Array(e, Edge(e.v2, e.v1)) )
+								// add degree of first vertex
+								.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter())
+								// join degrees of vertices
+								.groupBy(0,2).reduce( (e1, e2) => 	if(e1.d2 == 0) 
+																		new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2)
+																	else
+																		new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2)
+													)
+													
+		// project edges by degrees, vertex with smaller degree comes first
+		val edgesByDegree = edgesWithDegrees
+								.map(e => if (e.d1 < e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1) )
+		// project edges by Id, vertex with smaller Id comes first
+		val edgesById = edgesByDegree
+								.map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+		
+		val triangles = edgesByDegree
+						// build triads
+						.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new TriadBuilder())
+						// filter triads
+						.join(edgesById).where(1,2).equalTo(0,1) { (t, _) => Some(t) }
+		
+		// emit result
+		if (fileOutput) {
+			triangles.writeAsCsv(outputPath, "\n", " ")
+		} else {
+			triangles.print()
+		}
+		
+		// execute program
+		env.execute("TriangleEnumeration Example")
+	}
+
+	// *************************************************************************
+	//     USER DATA TYPES
+	// *************************************************************************
+
+	case class Edge(v1: Int, v2: Int) extends Serializable
+	case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+	case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable
+	
+		
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Counts the number of edges that share a common vertex.
+	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
+	 * For each emitted edge, the first vertex is the vertex with the smaller id.
+	 */
+	class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] {
+		
+		val vertices = MutableList[Integer]()
+		var groupVertex = 0
+		
+		override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = {
+			
+			// empty vertex list
+			vertices.clear
+			
+			// collect all vertices
+			for(e <- edges.asScala) {
+				groupVertex = e.v1
+				if(!vertices.contains(e.v2) && e.v1 != e.v2) {
+					vertices += e.v2
+				}
+			}
+			
+			// count vertices to obtain degree of groupVertex
+			val degree = vertices.length
+			
+			// create and emit edges with degrees
+			for(v <- vertices) {
+				if (v < groupVertex) {
+					out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree))
+				} else {
+					out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0))
+				}
+			}
+		}
+	}
+	
+	/**
+	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+		val vertices = MutableList[Integer]()
+		
+		override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+			
+			// clear vertex list
+			vertices.clear
+
+			// build and emit triads
+			for(e <- edges.asScala) {
+			
+				// combine vertex with all previously read vertices
+				for(v <- vertices) {
+					out.collect(Triad(e.v1, v, e.v2))
+				}
+				vertices += e.v2
+			}
+		}
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private def parseParameters(args: Array[String]): Boolean = {
+		if (args.length > 0) {
+			fileOutput = true
+			if (args.length == 2) {
+				edgePath = args(0)
+				outputPath = args(1)
+			} else {
+				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+				false
+			}
+		} else {
+			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
+		}
+		true
+	}
+
+	private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+		if (fileOutput) {
+			env.readCsvFile[(Int, Int)](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)).
+			map { x => new Edge(x._1, x._2) }
+		} else {
+			val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) }
+			env.fromCollection(edges)
+		}
+	}
+	
+	
+	private var fileOutput: Boolean = false
+	private var edgePath: String = null
+	private var outputPath: String = null
+
+}
\ No newline at end of file