You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:45 UTC

[incubator-wayang] 07/11: [WAYANG-34] add Terasort just TeraValidate.scala working

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 01753193d7e64d5f919b94a9e4cef29a6d357eb2
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 14:38:28 2021 +0200

    [WAYANG-34] add Terasort just TeraValidate.scala working
    
    Signed-off-by: bertty <be...@gmail.com>
---
 .../apache/wayang/apps/terasort/Unsigned16.java    |  3 +-
 .../org/apache/wayang/apps/terasort/TeraApp.scala  |  4 +-
 .../apache/wayang/apps/terasort/TeraValidate.scala | 82 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
index ae3b99e..57516b1 100644
--- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import java.io.Serializable;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.hadoop.io.Writable;
  *
  *  * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a>
  */
-class Unsigned16 implements Writable {
+class Unsigned16 implements Writable, Serializable {
   private long hi8;
   private long lo8;
 
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
index fa8f9d4..5023ec9 100644
--- a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
@@ -41,14 +41,14 @@ object TeraApp extends ExperimentDescriptor {
     val partitions = args(4).toInt
     experiment.getSubject.addConfiguration("partitions", partitions)
     val input_file = if(args(5).equals("null")) null else args(5)
-    val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null
+    val output_file = if(args.length > 6){ if(args(6).equals("null")) null else args(6) } else null
     experiment.getSubject.addConfiguration("inputFile", input_file)
     experiment.getSubject.addConfiguration("outputFile", output_file)
 
     task match {
       case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions)
       case "sort" => new TeraSort(plugins: _*).apply(input_file, output_file)
-      case "validate" => null
+      case "validate" => new TeraValidate(plugins: _*).apply(input_file)
     }
 
 
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala
new file mode 100644
index 0000000..829a0ba
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala
@@ -0,0 +1,82 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.commons.util.profiledb.model.Experiment
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+import org.apache.hadoop.util.PureJavaCrc32
+import com.google.common.primitives.UnsignedBytes
+
+class TeraValidate(@transient plugins: Plugin*) extends Serializable {
+
+  def apply(input_url: String)
+           (implicit configuration: Configuration, experiment: Experiment) = {
+
+    val wayangCtx = new WayangContext(configuration)
+    plugins.foreach(wayangCtx.register)
+    val planBuilder = new PlanBuilder(wayangCtx)
+    val dataset = planBuilder
+      .readObjectFile[Tuple2[Array[Byte], Array[Byte]]](input_url)
+
+    val output = dataset.mapPartitions(
+      iterable_element => {
+        val iter = iterable_element.iterator
+        val sum = new Unsigned16
+        val checksum = new Unsigned16
+        val crc32 = new PureJavaCrc32()
+        val min = new Array[Byte](10)
+        val max = new Array[Byte](10)
+
+        val cmp = UnsignedBytes.lexicographicalComparator()
+
+        var pos = 0L
+        var prev = new Array[Byte](10)
+
+        while (iter.hasNext) {
+          val key = iter.next()._1
+          assert(cmp.compare(key, prev) >= 0)
+
+          crc32.reset()
+          crc32.update(key, 0, key.length)
+          checksum.set(crc32.getValue)
+          sum.add(checksum)
+
+          if (pos == 0) {
+            key.copyToArray(min, 0, 10)
+          }
+          pos += 1
+          prev = key
+        }
+        prev.copyToArray(max, 0, 10)
+        Iterator((sum, min, max)).toStream
+      }
+    )
+
+    val checksumOutput = output.collect()
+    val cmp = UnsignedBytes.lexicographicalComparator()
+    val sum = new Unsigned16
+    var numRecords = dataset.count.collect().head
+
+    checksumOutput.foreach { case (partSum, min, max) =>
+      sum.add(partSum)
+    }
+    println("num records: " + numRecords)
+    println("checksum: " + sum.toString)
+    var lastMax = new Array[Byte](10)
+    checksumOutput.map{ case (partSum, min, max) =>
+      (partSum, min.clone(), max.clone())
+    }.zipWithIndex.foreach { case ((partSum, min, max), i) =>
+      println(s"part $i")
+      println(s"lastMax" + lastMax.toSeq.map(x => if (x < 0) 256 + x else x))
+      println(s"min " + min.toSeq.map(x => if (x < 0) 256 + x else x))
+      println(s"max " + max.toSeq.map(x => if (x < 0) 256 + x else x))
+      assert(cmp.compare(min, max) <= 0, "min >= max")
+      assert(cmp.compare(lastMax, min) <= 0, "current partition min < last partition max")
+      lastMax = max
+    }
+    println("num records: " + numRecords)
+    println("checksum: " + sum.toString)
+    println("partitions are properly sorted")
+  }
+
+}