You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:13 UTC

[31/60] git commit: Transitive closure Scala example

Transitive closure Scala example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e0f2440d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e0f2440d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e0f2440d

Branch: refs/heads/master
Commit: e0f2440d674dc6548c49ae92a8f1fcf42cd242f0
Parents: a8dd958
Author: Kostas Tzoumas <ko...@kostass-mbp.fritz.box>
Authored: Wed Sep 10 16:20:46 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200

----------------------------------------------------------------------
 .../java/graph/TransitiveClosureNaive.java      |  22 ++++
 flink-examples/flink-scala-examples/pom.xml     |  24 +++-
 .../scala/graph/TransitiveClosureNaive.scala    | 119 +++++++++++++++++++
 .../TransitiveClosureITCase.java                |  60 ++++++++++
 4 files changed, 224 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index 281439e..0745d73 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.example.java.graph;
 
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +31,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.util.Collector;
 
+import java.util.HashSet;
+import java.util.Set;
+
 @SuppressWarnings("serial")
 public class TransitiveClosureNaive implements ProgramDescription {
 
@@ -73,6 +77,24 @@ public class TransitiveClosureNaive implements ProgramDescription {
 					}
 				});
 
+		DataSet<Tuple2<Long,Long>> newPaths = paths
+				.coGroup(nextPaths)
+				.where(0).equalTo(0)
+				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					Set prevSet = new HashSet<Tuple2<Long,Long>>();
+					@Override
+					public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
+						for (Tuple2<Long,Long> prev : prevPaths) {
+							prevSet.add(prev);
+						}
+						for (Tuple2<Long,Long> next: nextPaths) {
+							if (!prevSet.contains(next)) {
+								out.collect(next);
+							}
+						}
+					}
+				});
+
 		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index a6801f8..3b50c8b 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -281,7 +281,29 @@ under the License.
 					</execution>
 					
 					-->
-		
+
+                    <execution>
+                        <id>TransitiveClosureNaive</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+
+                        <configuration>
+                            <classifier>TransitiveClosureNaive</classifier>
+
+                            <archive>
+                                <manifestEntries>
+                                    <program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
+                                </manifestEntries>
+                            </archive>
+
+                            <includes>
+                                <include>**/wordcount/TransitiveClosureNaive*.class</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+
 				</executions>
 			</plugin>
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 858ce30..86d83db 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -1,3 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import org.apache.flink.example.java.graph.util.ConnectedComponentsData
+import org.apache.flink.util.Collector
+
+object TransitiveClosureNaive {
+
+
+	def main (args: Array[String]): Unit = {
+		if (!parseParameters(args)) {
+			return
+		}
+
+		val env = ExecutionEnvironment.getExecutionEnvironment
+
+		val edges = getEdgesDataSet(env)
+
+		val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long,Long)] =>
+
+			val nextPaths = prevPaths
+				.join(edges)
+				.where(1).equalTo(0) {
+					(left,right) => Some((left._1,right._2))
+				}
+				.union(prevPaths)
+				.groupBy(0,1)
+				.reduce((l,r) => l)
+
+			val terminate = prevPaths
+				.coGroup(nextPaths)
+				.where(0).equalTo(0) {
+					(prev, next, out: Collector[(Long, Long)]) => {
+						val prevPaths = prev.toList
+						for (n <- next)
+							if (!prevPaths.contains(n))
+								out.collect(n)
+					}
+			}
+			(nextPaths, terminate)
+		}
+
+		if (fileOutput)
+			paths.writeAsCsv(outputPath, "\n", " ")
+		else
+			paths.print()
+
+		env.execute("Scala Transitive Closure Example")
+
+
+	}
+
+
+	private var fileOutput: Boolean = false
+	private var edgesPath: String = null
+	private var outputPath: String = null
+	private var maxIterations: Int = 10
+
+	private def parseParameters(programArguments: Array[String]): Boolean = {
+		if (programArguments.length > 0) {
+			fileOutput = true
+			if (programArguments.length == 3) {
+				edgesPath = programArguments(0)
+				outputPath = programArguments(1)
+				maxIterations = Integer.parseInt(programArguments(2))
+			}
+			else {
+				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>")
+				return false
+			}
+		}
+		else {
+			System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.")
+			System.out.println("  Provide parameters to read input data from files.")
+			System.out.println("  See the documentation for the correct format of input files.")
+			System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>")
+		}
+		return true
+	}
+
+	private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
+		if (fileOutput) {
+			env.readCsvFile[(Long, Long)](
+				edgesPath,
+				fieldDelimiter = ' ',
+				includedFields = Array(0, 1))
+				.map { x => (x._1, x._2)}
+		}
+		else {
+			val edgeData = ConnectedComponentsData.EDGES map {
+				case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+			}
+			env.fromCollection(edgeData)
+		}
+	}
+}
+
+
+
+
+
+
 ///**
 // * Licensed to the Apache Software Foundation (ASF) under one
 // * or more contributor license agreements.  See the NOTICE file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
new file mode 100644
index 0000000..1bf25a6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+
+import java.io.BufferedReader;
+
+import org.apache.flink.examples.scala.graph.TransitiveClosureNaive;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.testdata.TransitiveClosureData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+    private static final long SEED = 0xBADC0FFEEBEEFL;
+
+    private static final int NUM_VERTICES = 1000;
+
+    private static final int NUM_EDGES = 10000;
+
+    private String edgesPath;
+    private String resultPath;
+
+
+    @Override
+    protected void preSubmit() throws Exception {
+        edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+        resultPath = getTempFilePath("results");
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        TransitiveClosureNaive.main(new String [] {edgesPath, resultPath, "5"});
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        for (BufferedReader reader : getResultReader(resultPath)) {
+            TransitiveClosureData.checkOddEvenResult(reader);
+        }
+    }
+}
+