You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:13 UTC
[31/60] git commit: Transitive closure Scala example
Transitive closure Scala example
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e0f2440d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e0f2440d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e0f2440d
Branch: refs/heads/master
Commit: e0f2440d674dc6548c49ae92a8f1fcf42cd242f0
Parents: a8dd958
Author: Kostas Tzoumas <ko...@kostass-mbp.fritz.box>
Authored: Wed Sep 10 16:20:46 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200
----------------------------------------------------------------------
.../java/graph/TransitiveClosureNaive.java | 22 ++++
flink-examples/flink-scala-examples/pom.xml | 24 +++-
.../scala/graph/TransitiveClosureNaive.scala | 119 +++++++++++++++++++
.../TransitiveClosureITCase.java | 60 ++++++++++
4 files changed, 224 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index 281439e..0745d73 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -19,6 +19,7 @@
package org.apache.flink.example.java.graph;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
@@ -30,6 +31,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;
+import java.util.HashSet;
+import java.util.Set;
+
@SuppressWarnings("serial")
public class TransitiveClosureNaive implements ProgramDescription {
@@ -73,6 +77,24 @@ public class TransitiveClosureNaive implements ProgramDescription {
}
});
+ DataSet<Tuple2<Long,Long>> newPaths = paths
+ .coGroup(nextPaths)
+ .where(0).equalTo(0)
+ .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ Set prevSet = new HashSet<Tuple2<Long,Long>>();
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Tuple2<Long,Long> prev : prevPaths) {
+ prevSet.add(prev);
+ }
+ for (Tuple2<Long,Long> next: nextPaths) {
+ if (!prevSet.contains(next)) {
+ out.collect(next);
+ }
+ }
+ }
+ });
+
DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index a6801f8..3b50c8b 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -281,7 +281,29 @@ under the License.
</execution>
-->
-
+
+ <execution>
+ <id>TransitiveClosureNaive</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+
+ <configuration>
+ <classifier>TransitiveClosureNaive</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>**/wordcount/TransitiveClosureNaive*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
</executions>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 858ce30..86d83db 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -1,3 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import org.apache.flink.example.java.graph.util.ConnectedComponentsData
+import org.apache.flink.util.Collector
+
+object TransitiveClosureNaive {
+
+
+ def main (args: Array[String]): Unit = {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val edges = getEdgesDataSet(env)
+
+ val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long,Long)] =>
+
+ val nextPaths = prevPaths
+ .join(edges)
+ .where(1).equalTo(0) {
+ (left,right) => Some((left._1,right._2))
+ }
+ .union(prevPaths)
+ .groupBy(0,1)
+ .reduce((l,r) => l)
+
+ val terminate = prevPaths
+ .coGroup(nextPaths)
+ .where(0).equalTo(0) {
+ (prev, next, out: Collector[(Long, Long)]) => {
+ val prevPaths = prev.toList
+ for (n <- next)
+ if (!prevPaths.contains(n))
+ out.collect(n)
+ }
+ }
+ (nextPaths, terminate)
+ }
+
+ if (fileOutput)
+ paths.writeAsCsv(outputPath, "\n", " ")
+ else
+ paths.print()
+
+ env.execute("Scala Transitive Closure Example")
+
+
+ }
+
+
+ private var fileOutput: Boolean = false
+ private var edgesPath: String = null
+ private var outputPath: String = null
+ private var maxIterations: Int = 10
+
+ private def parseParameters(programArguments: Array[String]): Boolean = {
+ if (programArguments.length > 0) {
+ fileOutput = true
+ if (programArguments.length == 3) {
+ edgesPath = programArguments(0)
+ outputPath = programArguments(1)
+ maxIterations = Integer.parseInt(programArguments(2))
+ }
+ else {
+ System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>")
+ return false
+ }
+ }
+ else {
+ System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>")
+ }
+ return true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long)](
+ edgesPath,
+ fieldDelimiter = ' ',
+ includedFields = Array(0, 1))
+ .map { x => (x._1, x._2)}
+ }
+ else {
+ val edgeData = ConnectedComponentsData.EDGES map {
+ case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+ }
+ env.fromCollection(edgeData)
+ }
+ }
+}
+
+
+
+
+
+
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0f2440d/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
new file mode 100644
index 0000000..1bf25a6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+
+import java.io.BufferedReader;
+
+import org.apache.flink.examples.scala.graph.TransitiveClosureNaive;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.testdata.TransitiveClosureData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+ private static final long SEED = 0xBADC0FFEEBEEFL;
+
+ private static final int NUM_VERTICES = 1000;
+
+ private static final int NUM_EDGES = 10000;
+
+ private String edgesPath;
+ private String resultPath;
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+ edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+ resultPath = getTempFilePath("results");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ TransitiveClosureNaive.main(new String [] {edgesPath, resultPath, "5"});
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for (BufferedReader reader : getResultReader(resultPath)) {
+ TransitiveClosureData.checkOddEvenResult(reader);
+ }
+ }
+}
+