You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/06/13 10:41:28 UTC

[incubator-wayang] branch benchmark created (now 4d51a731)

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a change to branch benchmark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


      at 4d51a731 [LICENCE] add missing licence

This branch includes the following new commits:

     new 3a778b5d [WAYANG-BENCH] add Grep to the benchmark
     new 15ab0ffa [WAYANG-FLINK] Correction in the TextFileSink in Flink
     new 4d51a731 [LICENCE] add missing licence

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-wayang] 01/03: [WAYANG-BENCH] add Grep to the benchmark

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch benchmark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 3a778b5d7850d5c8db1e2887edc1da6769c58993
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue May 17 11:20:18 2022 +0200

    [WAYANG-BENCH] add Grep to the benchmark
    
    Signed-off-by: bertty <be...@apache.org>
---
 .../java/org/apache/wayang/apps/grep/Grep.java     | 133 +++++++++++++++++++++
 wayang-benchmark/wayang-benchmark_2.12/pom.xml     |  10 ++
 2 files changed, 143 insertions(+)

diff --git a/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java b/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java
new file mode 100644
index 00000000..6781b08d
--- /dev/null
+++ b/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java
@@ -0,0 +1,133 @@
+package org.apache.wayang.apps.grep;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.wayang.api.JavaPlanBuilder;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.plugin.Plugin;
+import org.apache.wayang.flink.Flink;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+public class Grep implements Serializable {
+
+  public static void pureJava(String input, String output) throws IOException {
+    Iterator<CharSequence> out = Files.lines(Paths.get(input))
+        .filter(line -> line.contains("six"))
+        .map(str -> (CharSequence) str)
+        .iterator();
+
+    Files.write(
+        Paths.get(output),
+        new Iterable<CharSequence>() {
+          @Override
+          public Iterator<CharSequence> iterator() {
+            return out;
+          }
+        }
+    );
+  }
+
+  public static void pureSpark(String input, String output) throws IOException {
+    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("grep");
+    SparkContext context = new SparkContext(conf);
+
+    context
+        .textFile(input, context.defaultParallelism())
+        .toJavaRDD()
+        .filter( line -> line.contains("six"))
+        .saveAsTextFile(output);
+  }
+
+  public static void pureFlink(String input, String output) throws Exception {
+    // Create an environment
+    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+    env.readTextFile(input)
+        .filter(line -> line.contains("six"))
+        .writeAsText(output);
+
+    env.execute();
+  }
+
+  public static void wayangPlatform(String input, String output, Plugin plugin){
+    // Set up WayangContext.
+    WayangContext wayangContext = new WayangContext().with(plugin);
+
+    // Build and execute a Wayang plan.
+    new JavaPlanBuilder(wayangContext)
+        .readTextFile(input)
+        .filter(line -> line.contains("six")).withName("Split words")
+        .writeTextFile(output, s -> s, "lala");
+  }
+
+  public static void wayangJava(String input, String output){
+    wayangPlatform(input, output, Java.basicPlugin());
+  }
+
+  public static void wayangSpark(String input, String output){
+    wayangPlatform(input, output, Spark.basicPlugin());
+  }
+
+  public static void wayangFlink(String input, String output){
+    wayangPlatform(input, output, Flink.basicPlugin());
+  }
+
+  public static void main(String... args) throws Exception {
+    int size = Integer.parseInt(args[0]);
+    String platform = args[1];
+
+    String input = args[2]+"/python/src/pywy/tests/resources/10e"+size+"MB.input";
+    String output = args[2]+"/lala.out";
+
+    String command = "rm -r "+output;
+    Process process = Runtime.getRuntime().exec(command);
+
+    long pre = System.currentTimeMillis();
+    switch (platform){
+      case "so":
+        Runtime.getRuntime().exec(
+            String.format(
+                "grep \"six\" %s > %s",
+                input,
+                output
+            )
+        );
+        break;
+      case "pure-java":
+        Grep.pureJava(input, output);
+        break;
+      case "pure-spark":
+        Grep.pureSpark("file://"+input, "file://"+output);
+        break;
+      case "pure-flink":
+        Grep.pureFlink(input, output);
+        break;
+      case "wayang-java":
+        Grep.wayangJava("file://"+input, "file://"+output);
+        break;
+      case "wayang-spark":
+        Grep.wayangSpark("file://"+input, "file://"+output);
+        break;
+      case "wayang-flink":
+        Grep.wayangFlink("file://"+input, output);
+        break;
+    }
+    long after = System.currentTimeMillis();
+    System.out.println(
+      String.format(
+        "the platform %s took %f s",
+        platform,
+        (float)((after - pre)/1000.0)
+      )
+    );
+
+  }
+}
diff --git a/wayang-benchmark/wayang-benchmark_2.12/pom.xml b/wayang-benchmark/wayang-benchmark_2.12/pom.xml
index e13c098a..1bdaf905 100644
--- a/wayang-benchmark/wayang-benchmark_2.12/pom.xml
+++ b/wayang-benchmark/wayang-benchmark_2.12/pom.xml
@@ -48,6 +48,16 @@
       <artifactId>spark-graphx_2.12</artifactId>
       <version>${spark.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-java</artifactId>
+      <version>1.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-flink_2.12</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 
 


[incubator-wayang] 02/03: [WAYANG-FLINK] Correction in the TextFileSink in Flink

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch benchmark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 15ab0ffa69061b5b2cfcb47690fc81ad0733450b
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue May 17 11:22:09 2022 +0200

    [WAYANG-FLINK] Correction in the TextFileSink in Flink
    
    Signed-off-by: bertty <be...@apache.org>
---
 .../java/org/apache/wayang/flink/operators/FlinkTextFileSink.java  | 7 ++++---
 .../code/main/resources/wayang-spark-defaults.properties           | 2 +-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
index 96823444..5cc979f7 100644
--- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
+++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
@@ -65,9 +65,10 @@ public class FlinkTextFileSink<Type> extends TextFileSink<Type> implements Flink
 
         DataSet<Type> inputDataset = ((DataSetChannel.Instance) inputs[0]).provideDataSet();
 
-        final TextOutputFormat.TextFormatter<Type> fileOutputFormat = flinkExecutor.getCompiler().compileOutput(this.formattingDescriptor);
-
-        inputDataset.writeAsFormattedText(this.textFileUrl, fileOutputFormat);
+        inputDataset.writeAsText(this.textFileUrl);
+//        final TextOutputFormat.TextFormatter<Type> fileOutputFormat = flinkExecutor.getCompiler().compileOutput(this.formattingDescriptor);
+//
+//        inputDataset.writeAsFormattedText(this.textFileUrl, fileOutputFormat);
 
         return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
     }
diff --git a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
index 7b4feb3f..262fcbe2 100644
--- a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
+++ b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-spark.master = local[1]
+spark.master = local[*]
 spark.app.name = Wayang App
 spark.ui.showConsoleProgress = false
 spark.driver.allowMultipleContexts=true


[incubator-wayang] 03/03: [LICENCE] add missing licence

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch benchmark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 4d51a73156d2a20d4aa97f97b98f19900283e39d
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Jun 13 12:41:05 2022 +0200

    [LICENCE] add missing licence
    
    Signed-off-by: bertty <be...@apache.org>
---
 .../main/java/org/apache/wayang/apps/grep/Grep.java   | 19 ++++++++++++++++++-
 .../main/resources/wayang-spark-defaults.properties   |  2 +-
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java b/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java
index 6781b08d..35d483cf 100644
--- a/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java
+++ b/wayang-benchmark/code/main/java/org/apache/wayang/apps/grep/Grep.java
@@ -1,9 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.wayang.apps.grep;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Iterator;
 import org.apache.flink.api.java.ExecutionEnvironment;
diff --git a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
index 262fcbe2..7b4feb3f 100644
--- a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
+++ b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-spark.master = local[*]
+spark.master = local[1]
 spark.app.name = Wayang App
 spark.ui.showConsoleProgress = false
 spark.driver.allowMultipleContexts=true