You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:45 UTC
[incubator-wayang] 07/11: [WAYANG-34] add Terasort just
TeraValidate.scala working
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 01753193d7e64d5f919b94a9e4cef29a6d357eb2
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 14:38:28 2021 +0200
[WAYANG-34] add Terasort just TeraValidate.scala working
Signed-off-by: bertty <be...@gmail.com>
---
.../apache/wayang/apps/terasort/Unsigned16.java | 3 +-
.../org/apache/wayang/apps/terasort/TeraApp.scala | 4 +-
.../apache/wayang/apps/terasort/TeraValidate.scala | 82 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 3 deletions(-)
diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
index ae3b99e..57516b1 100644
--- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.hadoop.io.Writable;
/**
@@ -30,7 +31,7 @@ import org.apache.hadoop.io.Writable;
*
* * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a>
*/
-class Unsigned16 implements Writable {
+class Unsigned16 implements Writable, Serializable {
private long hi8;
private long lo8;
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
index fa8f9d4..5023ec9 100644
--- a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
@@ -41,14 +41,14 @@ object TeraApp extends ExperimentDescriptor {
val partitions = args(4).toInt
experiment.getSubject.addConfiguration("partitions", partitions)
val input_file = if(args(5).equals("null")) null else args(5)
- val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null
+ val output_file = if(args.length > 6){ if(args(6).equals("null")) null else args(6) } else null
experiment.getSubject.addConfiguration("inputFile", input_file)
experiment.getSubject.addConfiguration("outputFile", output_file)
task match {
case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions)
case "sort" => new TeraSort(plugins: _*).apply(input_file, output_file)
- case "validate" => null
+ case "validate" => new TeraValidate(plugins: _*).apply(input_file)
}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala
new file mode 100644
index 0000000..829a0ba
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala
@@ -0,0 +1,82 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.commons.util.profiledb.model.Experiment
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+import org.apache.hadoop.util.PureJavaCrc32
+import com.google.common.primitives.UnsignedBytes
+
+class TeraValidate(@transient plugins: Plugin*) extends Serializable {
+
+ def apply(input_url: String)
+ (implicit configuration: Configuration, experiment: Experiment) = {
+
+ val wayangCtx = new WayangContext(configuration)
+ plugins.foreach(wayangCtx.register)
+ val planBuilder = new PlanBuilder(wayangCtx)
+ val dataset = planBuilder
+ .readObjectFile[Tuple2[Array[Byte], Array[Byte]]](input_url)
+
+ val output = dataset.mapPartitions(
+ iterable_element => {
+ val iter = iterable_element.iterator
+ val sum = new Unsigned16
+ val checksum = new Unsigned16
+ val crc32 = new PureJavaCrc32()
+ val min = new Array[Byte](10)
+ val max = new Array[Byte](10)
+
+ val cmp = UnsignedBytes.lexicographicalComparator()
+
+ var pos = 0L
+ var prev = new Array[Byte](10)
+
+ while (iter.hasNext) {
+ val key = iter.next()._1
+ assert(cmp.compare(key, prev) >= 0)
+
+ crc32.reset()
+ crc32.update(key, 0, key.length)
+ checksum.set(crc32.getValue)
+ sum.add(checksum)
+
+ if (pos == 0) {
+ key.copyToArray(min, 0, 10)
+ }
+ pos += 1
+ prev = key
+ }
+ prev.copyToArray(max, 0, 10)
+ Iterator((sum, min, max)).toStream
+ }
+ )
+
+ val checksumOutput = output.collect()
+ val cmp = UnsignedBytes.lexicographicalComparator()
+ val sum = new Unsigned16
+ var numRecords = dataset.count.collect().head
+
+ checksumOutput.foreach { case (partSum, min, max) =>
+ sum.add(partSum)
+ }
+ println("num records: " + numRecords)
+ println("checksum: " + sum.toString)
+ var lastMax = new Array[Byte](10)
+ checksumOutput.map{ case (partSum, min, max) =>
+ (partSum, min.clone(), max.clone())
+ }.zipWithIndex.foreach { case ((partSum, min, max), i) =>
+ println(s"part $i")
+ println(s"lastMax" + lastMax.toSeq.map(x => if (x < 0) 256 + x else x))
+ println(s"min " + min.toSeq.map(x => if (x < 0) 256 + x else x))
+ println(s"max " + max.toSeq.map(x => if (x < 0) 256 + x else x))
+ assert(cmp.compare(min, max) <= 0, "min >= max")
+ assert(cmp.compare(lastMax, min) <= 0, "current partition min < last partition max")
+ lastMax = max
+ }
+ println("num records: " + numRecords)
+ println("checksum: " + sum.toString)
+ println("partitions are properly sorted")
+ }
+
+}