You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:43 UTC
[incubator-wayang] 05/11: [WAYANG-34] add Terasort just TeraGen
running
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 47e2ef5a8eb7e398f14cb768f259ba960944f985
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 14:00:44 2021 +0200
[WAYANG-34] add Terasort just TeraGen running
Signed-off-by: bertty <be...@gmail.com>
---
wayang-benchmark/pom.xml | 15 +
.../org/apache/wayang/apps/terasort/Random16.java | 376 +++++++++++++++++++++
.../apache/wayang/apps/terasort/Unsigned16.java | 299 ++++++++++++++++
.../org/apache/wayang/apps/terasort/TeraApp.scala | 138 ++++++++
.../org/apache/wayang/apps/terasort/TeraGen.scala | 111 ++++++
5 files changed, 939 insertions(+)
diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index 477c752..5ca2b06 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -71,6 +71,21 @@
<artifactId>wayang-sqlite3</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>2.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.mayor.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
</dependencies>
diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java
new file mode 100644
index 0000000..6e85d92
--- /dev/null
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.apps.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ * or 0x2360ed051fc65da44385df649fccf645
+ * and c is 98910279301475397889117759788405497857
+ * or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg
+ * <ch...@ordinal.com>.
+ *
+ * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Random16.java">Terasort Example</a>
+ */
+class Random16 {
+
+ /**
+ * The "Gen" array contain powers of 2 of the linear congruential generator.
+ * The index 0 struct contain the "a" coefficient and "c" constant for the
+ * generator. That is, the generator is:
+ * f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+ *
+ * All structs after the first contain an "a" and "c" that
+ * comprise the square of the previous function.
+ *
+ * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+ * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+ * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+ * ...
+
+ */
+ private static class RandomConstant {
+ final Unsigned16 a;
+ final Unsigned16 c;
+ public RandomConstant(String left, String right) {
+ a = new Unsigned16(left);
+ c = new Unsigned16(right);
+ }
+ }
+
+ private static final RandomConstant[] genArray = new RandomConstant[]{
+ /* [ 0] */ new RandomConstant("2360ed051fc65da44385df649fccf645",
+ "4a696d47726179524950202020202001"),
+ /* [ 1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99",
+ "95e0e48262b3edfe04479485c755b646"),
+ /* [ 2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771",
+ "882a02c315362b60765f100068b33a1c"),
+ /* [ 3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1",
+ "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+ /* [ 4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1",
+ "f25bd15439d16af594c1b1bafa6239f0"),
+ /* [ 5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781",
+ "89ca67c29c9397d59c612596145db7e0"),
+ /* [ 6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01",
+ "8b6ae036713bd578a8093c8eae5c7fc0"),
+ /* [ 7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01",
+ "98a2542fd23d0dbdff3b886cdb1d3f80"),
+ /* [ 8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01",
+ "954db923fdb7933e947cd1edcecb7f00"),
+ /* [ 9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801",
+ "00be4a36657c98cd204e8c8af7dafe00"),
+ /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001",
+ "991965329dccb28d581199ab18c5fc00"),
+ /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001",
+ "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+ /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001",
+ "2b657bbfd6ed9d632079e70c3c97f000"),
+ /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+ "59b60ee4c52fa49e9fe90682bd2fe000"),
+ /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001",
+ "cc099c88030679464fe86aae8a5fc000"),
+ /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001",
+ "06b9abff9f9f33dd30362c0154bf8000"),
+ /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001",
+ "e296707121688d5a0260b293a97f0000"),
+ /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001",
+ "189ffc4701ff23cb8f8acf6b52fe0000"),
+ /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001",
+ "5141110ab208fb9d61fb47e6a5fc0000"),
+ /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001",
+ "3c97caa62540f2948d8d340d4bf80000"),
+ /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001",
+ "1b25cb9cfe5a0c963174f91a97f00000"),
+ /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001",
+ "0c644570b4a487103c5436352fe00000"),
+ /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001",
+ "3d0589c28869472bde517c6a5fc00000"),
+ /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001",
+ "bc95e5ab36477e65534738d4bf800000"),
+ /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001",
+ "ddb02ff72a031c01011f71a97f000000"),
+ /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001",
+ "2561426086d9acdb6c82e352fe000000"),
+ /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001",
+ "64a788e3c118ed1c8215c6a5fc000000"),
+ /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001",
+ "e65ea321908627cfa86b8d4bf8000000"),
+ /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001",
+ "53d27225604d85f9e1d71a97f0000000"),
+ /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001",
+ "ca5ec7a3ed1fe55e07ae352fe0000000"),
+ /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001",
+ "4daebb2e085330651f5c6a5fc0000000"),
+ /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001",
+ "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+ /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001",
+ "158c62f2b31e496dfd71a97f00000000"),
+ /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001",
+ "290e84a2eb15fd1ffae352fe00000000"),
+ /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001",
+ "e3dc1bfbe991a34ff5c6a5fc00000000"),
+ /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001",
+ "ddf540d020b9eadfeb8d4bf800000000"),
+ /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001",
+ "8ee4950177ce66bfd71a97f000000000"),
+ /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001",
+ "39e0f787c907117fae352fe000000000"),
+ /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001",
+ "659d2522f7b732ff5c6a5fc000000000"),
+ /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001",
+ "9e8722938612a5feb8d4bf8000000000"),
+ /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001",
+ "e941a65d66b64bfd71a97f0000000000"),
+ /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001",
+ "7b50d19437b097fae352fe0000000000"),
+ /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001",
+ "59d7b68e18712ff5c6a5fc0000000000"),
+ /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001",
+ "4087bab2d5225feb8d4bf80000000000"),
+ /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001",
+ "b470abc03b44bfd71a97f00000000000"),
+ /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001",
+ "366630eaba897fae352fe00000000000"),
+ /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001",
+ "a2dfc77e8512ff5c6a5fc00000000000"),
+ /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001",
+ "1e0d25a14a25feb8d4bf800000000000"),
+ /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001",
+ "9d50a5d3944bfd71a97f000000000000"),
+ /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001",
+ "bf7ab5eb2897fae352fe000000000000"),
+ /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001",
+ "925b14e6512ff5c6a5fc000000000000"),
+ /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001",
+ "724cce0ca25feb8d4bf8000000000000"),
+ /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001",
+ "1af42d1944bfd71a97f0000000000000"),
+ /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001",
+ "0f529e32897fae352fe0000000000000"),
+ /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001",
+ "844e4c6512ff5c6a5fc0000000000000"),
+ /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+ "9f40d8ca25feb8d4bf80000000000000"),
+ /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+ "9912b1944bfd71a97f00000000000000"),
+ /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+ "9c69632897fae352fe00000000000000"),
+ /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+ "e1e2c6512ff5c6a5fc00000000000000"),
+ /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+ "68058ca25feb8d4bf800000000000000"),
+ /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+ "610b1944bfd71a97f000000000000000"),
+ /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+ "061632897fae352fe000000000000000"),
+ /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+ "1c2c6512ff5c6a5fc000000000000000"),
+ /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+ "7858ca25feb8d4bf8000000000000000"),
+ /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+ "f0b1944bfd71a97f0000000000000000"),
+ /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+ "e1632897fae352fe0000000000000000"),
+ /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+ "c2c6512ff5c6a5fc0000000000000000"),
+ /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+ "858ca25feb8d4bf80000000000000000"),
+ /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+ "0b1944bfd71a97f00000000000000000"),
+ /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+ "1632897fae352fe00000000000000000"),
+ /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+ "2c6512ff5c6a5fc00000000000000000"),
+ /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+ "58ca25feb8d4bf800000000000000000"),
+ /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+ "b1944bfd71a97f000000000000000000"),
+ /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+ "632897fae352fe000000000000000000"),
+ /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+ "c6512ff5c6a5fc000000000000000000"),
+ /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+ "8ca25feb8d4bf8000000000000000000"),
+ /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+ "1944bfd71a97f0000000000000000000"),
+ /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+ "32897fae352fe0000000000000000000"),
+ /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+ "6512ff5c6a5fc0000000000000000000"),
+ /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+ "ca25feb8d4bf80000000000000000000"),
+ /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+ "944bfd71a97f00000000000000000000"),
+ /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+ "2897fae352fe00000000000000000000"),
+ /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+ "512ff5c6a5fc00000000000000000000"),
+ /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+ "a25feb8d4bf800000000000000000000"),
+ /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+ "44bfd71a97f000000000000000000000"),
+ /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+ "897fae352fe000000000000000000000"),
+ /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+ "12ff5c6a5fc000000000000000000000"),
+ /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+ "25feb8d4bf8000000000000000000000"),
+ /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+ "4bfd71a97f0000000000000000000000"),
+ /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+ "97fae352fe0000000000000000000000"),
+ /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+ "2ff5c6a5fc0000000000000000000000"),
+ /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+ "5feb8d4bf80000000000000000000000"),
+ /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+ "bfd71a97f00000000000000000000000"),
+ /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+ "7fae352fe00000000000000000000000"),
+ /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+ "ff5c6a5fc00000000000000000000000"),
+ /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+ "feb8d4bf800000000000000000000000"),
+ /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+ "fd71a97f000000000000000000000000"),
+ /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+ "fae352fe000000000000000000000000"),
+ /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+ "f5c6a5fc000000000000000000000000"),
+ /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+ "eb8d4bf8000000000000000000000000"),
+ /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+ "d71a97f0000000000000000000000000"),
+ /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+ "ae352fe0000000000000000000000000"),
+ /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+ "5c6a5fc0000000000000000000000000"),
+ /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+ "b8d4bf80000000000000000000000000"),
+ /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+ "71a97f00000000000000000000000000"),
+ /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+ "e352fe00000000000000000000000000"),
+ /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+ "c6a5fc00000000000000000000000000"),
+ /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+ "8d4bf800000000000000000000000000"),
+ /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+ "1a97f000000000000000000000000000"),
+ /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+ "352fe000000000000000000000000000"),
+ /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+ "6a5fc000000000000000000000000000"),
+ /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+ "d4bf8000000000000000000000000000"),
+ /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+ "a97f0000000000000000000000000000"),
+ /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+ "52fe0000000000000000000000000000"),
+ /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+ "a5fc0000000000000000000000000000"),
+ /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+ "4bf80000000000000000000000000000"),
+ /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+ "97f00000000000000000000000000000"),
+ /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+ "2fe00000000000000000000000000000"),
+ /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+ "5fc00000000000000000000000000000"),
+ /* [119] */ new RandomConstant("de000000000000000000000000000001",
+ "bf800000000000000000000000000000"),
+ /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+ "7f000000000000000000000000000000"),
+ /* [121] */ new RandomConstant("78000000000000000000000000000001",
+ "fe000000000000000000000000000000"),
+ /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+ "fc000000000000000000000000000000"),
+ /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+ "f8000000000000000000000000000000"),
+ /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+ "f0000000000000000000000000000000"),
+ /* [125] */ new RandomConstant("80000000000000000000000000000001",
+ "e0000000000000000000000000000000"),
+ /* [126] */ new RandomConstant("00000000000000000000000000000001",
+ "c0000000000000000000000000000000"),
+ /* [127] */ new RandomConstant("00000000000000000000000000000001",
+ "80000000000000000000000000000000")};
+
+ /**
+ * generate the random number that is "advance" steps
+ * from an initial random number of 0. This is done by
+ * starting with 0, and then advancing the by the
+ * appropriate powers of 2 of the linear congruential
+ * generator.
+ */
+ public static Unsigned16 skipAhead(Unsigned16 advance) {
+ Unsigned16 result = new Unsigned16();
+ long bit_map;
+
+ bit_map = advance.getLow8();
+ for (int i = 0; bit_map != 0 && i < 64; i++) {
+ if ((bit_map & (1L << i)) != 0) {
+ /* advance random number by f**(2**i) (x)
+ */
+ result.multiply(genArray[i].a);
+ result.add(genArray[i].c);
+ bit_map &= ~(1L << i);
+ }
+ }
+ bit_map = advance.getHigh8();
+ for (int i = 0; bit_map != 0 && i < 64; i++)
+ {
+ if ((bit_map & (1L << i)) != 0) {
+ /* advance random number by f**(2**(i + 64)) (x)
+ */
+ result.multiply(genArray[i+64].a);
+ result.add(genArray[i+64].c);
+ bit_map &= ~(1L << i);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Generate the next 16 byte random number.
+ */
+ public static void nextRand(Unsigned16 rand) {
+ /* advance the random number forward once using the linear congruential
+ * generator, and then return the new random number
+ */
+ rand.multiply(genArray[0].a);
+ rand.add(genArray[0].c);
+ }
+}
diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
new file mode 100644
index 0000000..ae3b99e
--- /dev/null
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.apps.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An unsigned 16 byte integer class that supports addition, multiplication,
+ * and left shifts.
+ *
+ * * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a>
+ */
+class Unsigned16 implements Writable {
+ private long hi8;
+ private long lo8;
+
+ public Unsigned16() {
+ hi8 = 0;
+ lo8 = 0;
+ }
+
+ public Unsigned16(long l) {
+ hi8 = 0;
+ lo8 = l;
+ }
+
+ public Unsigned16(Unsigned16 other) {
+ hi8 = other.hi8;
+ lo8 = other.lo8;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Unsigned16) {
+ Unsigned16 other = (Unsigned16) o;
+ return other.hi8 == hi8 && other.lo8 == lo8;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) lo8;
+ }
+
+ /**
+ * Parse a hex string
+ * @param s the hex string
+ */
+ public Unsigned16(String s) throws NumberFormatException {
+ set(s);
+ }
+
+ /**
+ * Set the number from a hex string
+ * @param s the number in hexadecimal
+ * @throws NumberFormatException if the number is invalid
+ */
+ public void set(String s) throws NumberFormatException {
+ hi8 = 0;
+ lo8 = 0;
+ final long lastDigit = 0xfl << 60;
+ for (int i = 0; i < s.length(); ++i) {
+ int digit = getHexDigit(s.charAt(i));
+ if ((lastDigit & hi8) != 0) {
+ throw new NumberFormatException(s + " overflowed 16 bytes");
+ }
+ hi8 <<= 4;
+ hi8 |= (lo8 & lastDigit) >>> 60;
+ lo8 <<= 4;
+ lo8 |= digit;
+ }
+ }
+
+ /**
+ * Set the number to a given long.
+ * @param l the new value, which is treated as an unsigned number
+ */
+ public void set(long l) {
+ lo8 = l;
+ hi8 = 0;
+ }
+
+ /**
+ * Map a hexadecimal character into a digit.
+ * @param ch the character
+ * @return the digit from 0 to 15
+ * @throws NumberFormatException
+ */
+ private static int getHexDigit(char ch) throws NumberFormatException {
+ if (ch >= '0' && ch <= '9') {
+ return ch - '0';
+ }
+ if (ch >= 'a' && ch <= 'f') {
+ return ch - 'a' + 10;
+ }
+ if (ch >= 'A' && ch <= 'F') {
+ return ch - 'A' + 10;
+ }
+ throw new NumberFormatException(ch + " is not a valid hex digit");
+ }
+
+ private static final Unsigned16 TEN = new Unsigned16(10);
+
+ public static Unsigned16 fromDecimal(String s) throws NumberFormatException {
+ Unsigned16 result = new Unsigned16();
+ Unsigned16 tmp = new Unsigned16();
+ for(int i=0; i < s.length(); i++) {
+ char ch = s.charAt(i);
+ if (ch < '0' || ch > '9') {
+ throw new NumberFormatException(ch + " not a valid decimal digit");
+ }
+ int digit = ch - '0';
+ result.multiply(TEN);
+ tmp.set(digit);
+ result.add(tmp);
+ }
+ return result;
+ }
+
+ /**
+ * Return the number as a hex string.
+ */
+ public String toString() {
+ if (hi8 == 0) {
+ return Long.toHexString(lo8);
+ } else {
+ StringBuilder result = new StringBuilder();
+ result.append(Long.toHexString(hi8));
+ String loString = Long.toHexString(lo8);
+ for(int i=loString.length(); i < 16; ++i) {
+ result.append('0');
+ }
+ result.append(loString);
+ return result.toString();
+ }
+ }
+
+ /**
+ * Get a given byte from the number.
+ * @param b the byte to get with 0 meaning the most significant byte
+ * @return the byte or 0 if b is outside of 0..15
+ */
+ public byte getByte(int b) {
+ if (b >= 0 && b < 16) {
+ if (b < 8) {
+ return (byte) (hi8 >> (56 - 8*b));
+ } else {
+ return (byte) (lo8 >> (120 - 8*b));
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Get the hexadecimal digit at the given position.
+ * @param p the digit position to get with 0 meaning the most significant
+ * @return the character or '0' if p is outside of 0..31
+ */
+ public char getHexDigit(int p) {
+ byte digit = getByte(p / 2);
+ if (p % 2 == 0) {
+ digit >>>= 4;
+ }
+ digit &= 0xf;
+ if (digit < 10) {
+ return (char) ('0' + digit);
+ } else {
+ return (char) ('A' + digit - 10);
+ }
+ }
+
+ /**
+ * Get the high 8 bytes as a long.
+ */
+ public long getHigh8() {
+ return hi8;
+ }
+
+ /**
+ * Get the low 8 bytes as a long.
+ */
+ public long getLow8() {
+ return lo8;
+ }
+
+ /**
+ * Multiple the current number by a 16 byte unsigned integer. Overflow is not
+ * detected and the result is the low 16 bytes of the result. The numbers
+ * are divided into 32 and 31 bit chunks so that the product of two chucks
+ * fits in the unsigned 63 bits of a long.
+ * @param b the other number
+ */
+ void multiply(Unsigned16 b) {
+ // divide the left into 4 32 bit chunks
+ long[] left = new long[4];
+ left[0] = lo8 & 0xffffffffl;
+ left[1] = lo8 >>> 32;
+ left[2] = hi8 & 0xffffffffl;
+ left[3] = hi8 >>> 32;
+ // divide the right into 5 31 bit chunks
+ long[] right = new long[5];
+ right[0] = b.lo8 & 0x7fffffffl;
+ right[1] = (b.lo8 >>> 31) & 0x7fffffffl;
+ right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2);
+ right[3] = (b.hi8 >>> 29) & 0x7fffffffl;
+ right[4] = (b.hi8 >>> 60);
+ // clear the cur value
+ set(0);
+ Unsigned16 tmp = new Unsigned16();
+ for(int l=0; l < 4; ++l) {
+ for (int r=0; r < 5; ++r) {
+ long prod = left[l] * right[r];
+ if (prod != 0) {
+ int off = l*32 + r*31;
+ tmp.set(prod);
+ tmp.shiftLeft(off);
+ add(tmp);
+ }
+ }
+ }
+ }
+
+ /**
+ * Add the given number into the current number.
+ * @param b the other number
+ */
+ public void add(Unsigned16 b) {
+ long sumHi;
+ long sumLo;
+ long reshibit, hibit0, hibit1;
+
+ sumHi = hi8 + b.hi8;
+
+ hibit0 = (lo8 & 0x8000000000000000L);
+ hibit1 = (b.lo8 & 0x8000000000000000L);
+ sumLo = lo8 + b.lo8;
+ reshibit = (sumLo & 0x8000000000000000L);
+ if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0))
+ sumHi++; /* add carry bit */
+ hi8 = sumHi;
+ lo8 = sumLo;
+ }
+
+ /**
+ * Shift the number a given number of bit positions. The number is the low
+ * order bits of the result.
+ * @param bits the bit positions to shift by
+ */
+ public void shiftLeft(int bits) {
+ if (bits != 0) {
+ if (bits < 64) {
+ hi8 <<= bits;
+ hi8 |= (lo8 >>> (64 - bits));
+ lo8 <<= bits;
+ } else if (bits < 128) {
+ hi8 = lo8 << (bits - 64);
+ lo8 = 0;
+ } else {
+ hi8 = 0;
+ lo8 = 0;
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ hi8 = in.readLong();
+ lo8 = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(hi8);
+ out.writeLong(lo8);
+ }
+
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
new file mode 100644
index 0000000..f86a86e
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
@@ -0,0 +1,138 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper}
+import org.apache.wayang.apps.wordcount.WordCountScala
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.util.fs.FileSystems
+
+import java.util.regex.Pattern
+import scala.util.matching.Regex
+
+object TeraApp extends ExperimentDescriptor {
+
+ val KEY_LEN = 10
+ val VALUE_LEN = 100
+ val RECORD_LEN : Int = KEY_LEN + VALUE_LEN
+
+ override def version = "0.1.0"
+
+ def main(args: Array[String]) {
+ // Parse args.
+ if (args.isEmpty) {
+ println(s"Usage: " +
+ s"${Parameters.experimentHelp} " +
+ s"<plugin(,plugin)*> " +
+ s"<task could be[generate|sort|validate]> " +
+ s"<file size ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T])> " +
+ s"<partitions>" +
+ s"<input file if not value is null> " +
+ s"<output file if not value is null>")
+ sys.exit(1)
+ }
+ implicit val configuration = new Configuration
+ implicit val experiment = Parameters.createExperiment(args(0), this)
+ val plugins = Parameters.loadPlugins(args(1))
+ experiment.getSubject.addConfiguration("plugins", args(1))
+ val task = args(2)
+ experiment.getSubject.addConfiguration("task", task)
+ val fileSize = sizeStrToBytes(args(3))
+ experiment.getSubject.addConfiguration("fileSize", fileSize)
+ val partitions = args(4).toInt
+ experiment.getSubject.addConfiguration("partitions", partitions)
+ val input_file = if(args(5).equals("null")) null else args(5)
+ val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null
+ experiment.getSubject.addConfiguration("inputFile", input_file)
+ experiment.getSubject.addConfiguration("outputFile", output_file)
+
+ task match {
+ case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions)
+ case "sort" => null
+ case "validate" => null
+ }
+
+
+ // Run wordCount.
+// val wordCount = new WordCountScala(plugins: _*)
+// val words =
+// (if (wordsPerLine != null) {
+// wordCount(inputFile, wordsPerLine)
+// } else {
+// wordCount(inputFile)
+// }).toSeq.sortBy(-_._2)
+//
+// // Store experiment data.
+// val inputFileSize = FileSystems.getFileSize(inputFile)
+// if (inputFileSize.isPresent) experiment.getSubject.addConfiguration("inputSize", inputFileSize.getAsLong)
+// ProfileDBHelper.store(experiment, configuration)
+//
+// // Print results.
+// println(s"Found ${words.size} words:")
+// words.take(10).foreach(wc => println(s"${wc._2}x ${wc._1}"))
+// if (words.size > 10) print(s"${words.size - 10} more...")
+
+
+ }
+
+ /**
+ * Convert the string format ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T]) to the
+ * number on bytes
+ *
+ * B = Bytes
+ * k|K = Kilobytes (1_024 Bytes)
+ * m|M = Megabytes (1_048_576 Bytes)
+ * g|G = Gigabytes (1_073_741_824 Bytes)
+ * t|T = Terabytes (1_099_511_627_776 Bytes)
+ *
+ * @param str in the format
+ * @return number equivalent to the byte
+ */
+ def sizeStrToBytes(str: String): Long = {
+ val reg = "(\\d+(\\.\\d+)?)([B|k|K|m|M|g|G|t|T])"
+ val groups = Pattern.compile(reg).matcher(str)
+ groups.find()
+
+ val number_part:Double = groups.group(1).toDouble
+ val letter_part:String = groups.group(3)
+
+ val conversion = letter_part match {
+ case "B" => 1L //2^0
+ case "k" => 1024L //2^10
+ case "K" => 1024L //2^10
+ case "m" => 1048576L //2^20
+ case "M" => 1048576L //2^20
+ case "g" => 1073741824L //2^30
+ case "G" => 1073741824L //2^30
+ case "t" => 1099511627776L //2^40
+ case "T" => 1099511627776L //2^40
+ case _ => 1L //2^0
+ }
+ (number_part * conversion).toLong
+ }
+
+ /**
+ * take a number that represent a size on bytes return the human readable version
+ *
+ * @param size number that represent the size
+ * @return human readable version of the size
+ */
+ def sizeToSizeStr(size: Long): String = {
+ val kbScale: Long = 1024L
+ val mbScale: Long = 1024L * kbScale
+ val gbScale: Long = 1024L * mbScale
+ val tbScale: Long = 1024L * gbScale
+
+ if (size > tbScale) {
+ size / tbScale + "TB"
+ } else if (size > gbScale) {
+ size / gbScale + "GB"
+ } else if (size > mbScale) {
+ size / mbScale + "MB"
+ } else if (size > kbScale) {
+ size / kbScale + "KB"
+ } else {
+ size + "B"
+ }
+ }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala
new file mode 100644
index 0000000..89ac49d
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala
@@ -0,0 +1,111 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.commons.util.profiledb.model.Experiment
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+
+class TeraGen(@transient plugins: Plugin*) extends Serializable {
+
+ def apply(output_url: String, file_size: Long, partitions: Int)
+ (implicit configuration: Configuration, experiment: Experiment) = {
+ val wayangCtx = new WayangContext(configuration)
+ plugins.foreach(wayangCtx.register)
+ val planBuilder = new PlanBuilder(wayangCtx)
+
+ val parts = partitions
+ val recordsPerPartition = file_size / TeraApp.VALUE_LEN / parts.toLong
+ val numRecords = recordsPerPartition * parts.toLong
+
+ assert(recordsPerPartition < Int.MaxValue, s"records per partition > ${Int.MaxValue}")
+
+ println("===========================================================================")
+ println("===========================================================================")
+ println(s"Input size: $file_size")
+ println(s"Total number of records: $numRecords")
+ println(s"Number of output partitions: $parts")
+ println("Number of records/output partition: " + (numRecords / parts))
+ println("===========================================================================")
+ println("===========================================================================")
+
+ planBuilder
+ .withJobName(s"Terasort generate ${file_size}")
+ .withExperiment(experiment)
+ .withUdfJarsOf(this.getClass)
+ .loadCollection(1 to parts)
+ .flatMap( index => {
+ val one = new Unsigned16(1)
+ val firstRecordNumber = new Unsigned16(index.toLong * recordsPerPartition.toLong)
+ val recordsToGenerate = new Unsigned16(recordsPerPartition)
+
+ val recordNumber = new Unsigned16(firstRecordNumber)
+ val lastRecordNumber = new Unsigned16(firstRecordNumber)
+ lastRecordNumber.add(recordsToGenerate)
+
+ val rand = Random16.skipAhead(firstRecordNumber)
+
+ Iterator.tabulate(recordsPerPartition.toInt) { offset =>
+ val rowBytes: Array[Byte] = new Array[Byte](TeraApp.RECORD_LEN)
+ val key = new Array[Byte](TeraApp.KEY_LEN)
+ val value = new Array[Byte](TeraApp.VALUE_LEN)
+ Random16.nextRand(rand)
+ generateRecord(rowBytes, rand, recordNumber)
+ recordNumber.add(one)
+ rowBytes.copyToArray(key, 0, TeraApp.KEY_LEN)
+ rowBytes.takeRight(TeraApp.VALUE_LEN).copyToArray(value, 0, TeraApp.VALUE_LEN)
+ (key, value)
+ }.toStream
+ })
+ .writeObjectFile(output_url)
+ }
+
+ /**
+ * Generate a binary record suitable for all sort benchmarks except PennySort.
+ *
+ * @param recBuf record to return
+ */
+ def generateRecord(recBuf: Array[Byte], rand: Unsigned16, recordNumber: Unsigned16): Unit = {
+ // Generate the 10-byte key using the high 10 bytes of the 128-bit random number
+ var i = 0
+ while (i < 10) {
+ recBuf(i) = rand.getByte(i)
+ i += 1
+ }
+
+ // Add 2 bytes of "break"
+ recBuf(10) = 0x00.toByte
+ recBuf(11) = 0x11.toByte
+
+ // Convert the 128-bit record number to 32 bits of ascii hexadecimal
+ // as the next 32 bytes of the record.
+ i = 0
+ while (i < 32) {
+ recBuf(12 + i) = recordNumber.getHexDigit(i).toByte
+ i += 1
+ }
+
+ // Add 4 bytes of "break" data
+ recBuf(44) = 0x88.toByte
+ recBuf(45) = 0x99.toByte
+ recBuf(46) = 0xAA.toByte
+ recBuf(47) = 0xBB.toByte
+
+ // Add 48 bytes of filler based on low 48 bits of random number
+ i = 0
+ while (i < 12) {
+ val v = rand.getHexDigit(20 + i).toByte
+ recBuf(48 + i * 4) = v
+ recBuf(49 + i * 4) = v
+ recBuf(50 + i * 4) = v
+ recBuf(51 + i * 4) = v
+ i += 1
+ }
+
+ // Add 4 bytes of "break" data
+ recBuf(96) = 0xCC.toByte
+ recBuf(97) = 0xDD.toByte
+ recBuf(98) = 0xEE.toByte
+ recBuf(99) = 0xFF.toByte
+ }
+
+}