You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:43 UTC

[incubator-wayang] 05/11: [WAYANG-34] add Terasort just TeraGen running

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 47e2ef5a8eb7e398f14cb768f259ba960944f985
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 14:00:44 2021 +0200

    [WAYANG-34] add Terasort just TeraGen running
    
    Signed-off-by: bertty <be...@gmail.com>
---
 wayang-benchmark/pom.xml                           |  15 +
 .../org/apache/wayang/apps/terasort/Random16.java  | 376 +++++++++++++++++++++
 .../apache/wayang/apps/terasort/Unsigned16.java    | 299 ++++++++++++++++
 .../org/apache/wayang/apps/terasort/TeraApp.scala  | 138 ++++++++
 .../org/apache/wayang/apps/terasort/TeraGen.scala  | 111 ++++++
 5 files changed, 939 insertions(+)

diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index 477c752..5ca2b06 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -71,6 +71,21 @@
       <artifactId>wayang-sqlite3</artifactId>
       <version>0.6.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+      <version>2.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.mayor.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
   </dependencies>
 
 
diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java
new file mode 100644
index 0000000..6e85d92
--- /dev/null
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.apps.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ *            or 0x2360ed051fc65da44385df649fccf645
+ *   and c is 98910279301475397889117759788405497857
+ *            or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg 
+ * <ch...@ordinal.com>.
+ *
+ * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Random16.java">Terasort Example</a>
+ */
+class Random16 {
+
+  /**
+   * The "Gen" array contain powers of 2 of the linear congruential generator.
+   * The index 0 struct contain the "a" coefficient and "c" constant for the
+   * generator.  That is, the generator is:
+   *    f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+   *
+   * All structs after the first contain an "a" and "c" that
+   * comprise the square of the previous function.
+   *
+   * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+   * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+   * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+   * ...
+
+   */
+  private static class RandomConstant {
+    final Unsigned16 a;
+    final Unsigned16 c;
+    public RandomConstant(String left, String right) {
+      a = new Unsigned16(left);
+      c = new Unsigned16(right);
+    }
+  }
+
+  private static final RandomConstant[] genArray = new RandomConstant[]{
+      /* [  0] */ new RandomConstant("2360ed051fc65da44385df649fccf645",
+      "4a696d47726179524950202020202001"),
+      /* [  1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99",
+      "95e0e48262b3edfe04479485c755b646"),
+      /* [  2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771",
+      "882a02c315362b60765f100068b33a1c"),
+      /* [  3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1",
+      "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+      /* [  4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1",
+      "f25bd15439d16af594c1b1bafa6239f0"),
+      /* [  5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781",
+      "89ca67c29c9397d59c612596145db7e0"),
+      /* [  6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01",
+      "8b6ae036713bd578a8093c8eae5c7fc0"),
+      /* [  7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01",
+      "98a2542fd23d0dbdff3b886cdb1d3f80"),
+      /* [  8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01",
+      "954db923fdb7933e947cd1edcecb7f00"),
+      /* [  9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801",
+      "00be4a36657c98cd204e8c8af7dafe00"),
+      /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001",
+      "991965329dccb28d581199ab18c5fc00"),
+      /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001",
+      "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+      /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001",
+      "2b657bbfd6ed9d632079e70c3c97f000"),
+      /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+      "59b60ee4c52fa49e9fe90682bd2fe000"),
+      /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001",
+      "cc099c88030679464fe86aae8a5fc000"),
+      /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001",
+      "06b9abff9f9f33dd30362c0154bf8000"),
+      /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001",
+      "e296707121688d5a0260b293a97f0000"),
+      /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001",
+      "189ffc4701ff23cb8f8acf6b52fe0000"),
+      /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001",
+      "5141110ab208fb9d61fb47e6a5fc0000"),
+      /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001",
+      "3c97caa62540f2948d8d340d4bf80000"),
+      /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001",
+      "1b25cb9cfe5a0c963174f91a97f00000"),
+      /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001",
+      "0c644570b4a487103c5436352fe00000"),
+      /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001",
+      "3d0589c28869472bde517c6a5fc00000"),
+      /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001",
+      "bc95e5ab36477e65534738d4bf800000"),
+      /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001",
+      "ddb02ff72a031c01011f71a97f000000"),
+      /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001",
+      "2561426086d9acdb6c82e352fe000000"),
+      /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001",
+      "64a788e3c118ed1c8215c6a5fc000000"),
+      /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001",
+      "e65ea321908627cfa86b8d4bf8000000"),
+      /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001",
+      "53d27225604d85f9e1d71a97f0000000"),
+      /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001",
+      "ca5ec7a3ed1fe55e07ae352fe0000000"),
+      /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001",
+      "4daebb2e085330651f5c6a5fc0000000"),
+      /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001",
+      "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+      /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001",
+      "158c62f2b31e496dfd71a97f00000000"),
+      /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001",
+      "290e84a2eb15fd1ffae352fe00000000"),
+      /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001",
+      "e3dc1bfbe991a34ff5c6a5fc00000000"),
+      /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001",
+      "ddf540d020b9eadfeb8d4bf800000000"),
+      /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001",
+      "8ee4950177ce66bfd71a97f000000000"),
+      /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001",
+      "39e0f787c907117fae352fe000000000"),
+      /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001",
+      "659d2522f7b732ff5c6a5fc000000000"),
+      /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001",
+      "9e8722938612a5feb8d4bf8000000000"),
+      /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001",
+      "e941a65d66b64bfd71a97f0000000000"),
+      /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001",
+      "7b50d19437b097fae352fe0000000000"),
+      /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001",
+      "59d7b68e18712ff5c6a5fc0000000000"),
+      /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001",
+      "4087bab2d5225feb8d4bf80000000000"),
+      /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001",
+      "b470abc03b44bfd71a97f00000000000"),
+      /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001",
+      "366630eaba897fae352fe00000000000"),
+      /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001",
+      "a2dfc77e8512ff5c6a5fc00000000000"),
+      /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001",
+      "1e0d25a14a25feb8d4bf800000000000"),
+      /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001",
+      "9d50a5d3944bfd71a97f000000000000"),
+      /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001",
+      "bf7ab5eb2897fae352fe000000000000"),
+      /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001",
+      "925b14e6512ff5c6a5fc000000000000"),
+      /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001",
+      "724cce0ca25feb8d4bf8000000000000"),
+      /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001",
+      "1af42d1944bfd71a97f0000000000000"),
+      /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001",
+      "0f529e32897fae352fe0000000000000"),
+      /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001",
+      "844e4c6512ff5c6a5fc0000000000000"),
+      /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+      "9f40d8ca25feb8d4bf80000000000000"),
+      /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+      "9912b1944bfd71a97f00000000000000"),
+      /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+      "9c69632897fae352fe00000000000000"),
+      /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+      "e1e2c6512ff5c6a5fc00000000000000"),
+      /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+      "68058ca25feb8d4bf800000000000000"),
+      /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+      "610b1944bfd71a97f000000000000000"),
+      /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+      "061632897fae352fe000000000000000"),
+      /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+      "1c2c6512ff5c6a5fc000000000000000"),
+      /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+      "7858ca25feb8d4bf8000000000000000"),
+      /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+      "f0b1944bfd71a97f0000000000000000"),
+      /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+      "e1632897fae352fe0000000000000000"),
+      /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+      "c2c6512ff5c6a5fc0000000000000000"),
+      /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+      "858ca25feb8d4bf80000000000000000"),
+      /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+      "0b1944bfd71a97f00000000000000000"),
+      /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+      "1632897fae352fe00000000000000000"),
+      /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+      "2c6512ff5c6a5fc00000000000000000"),
+      /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+      "58ca25feb8d4bf800000000000000000"),
+      /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+      "b1944bfd71a97f000000000000000000"),
+      /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+      "632897fae352fe000000000000000000"),
+      /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+      "c6512ff5c6a5fc000000000000000000"),
+      /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+      "8ca25feb8d4bf8000000000000000000"),
+      /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+      "1944bfd71a97f0000000000000000000"),
+      /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+      "32897fae352fe0000000000000000000"),
+      /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+      "6512ff5c6a5fc0000000000000000000"),
+      /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+      "ca25feb8d4bf80000000000000000000"),
+      /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+      "944bfd71a97f00000000000000000000"),
+      /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+      "2897fae352fe00000000000000000000"),
+      /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+      "512ff5c6a5fc00000000000000000000"),
+      /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+      "a25feb8d4bf800000000000000000000"),
+      /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+      "44bfd71a97f000000000000000000000"),
+      /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+      "897fae352fe000000000000000000000"),
+      /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+      "12ff5c6a5fc000000000000000000000"),
+      /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+      "25feb8d4bf8000000000000000000000"),
+      /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+      "4bfd71a97f0000000000000000000000"),
+      /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+      "97fae352fe0000000000000000000000"),
+      /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+      "2ff5c6a5fc0000000000000000000000"),
+      /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+      "5feb8d4bf80000000000000000000000"),
+      /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+      "bfd71a97f00000000000000000000000"),
+      /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+      "7fae352fe00000000000000000000000"),
+      /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+      "ff5c6a5fc00000000000000000000000"),
+      /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+      "feb8d4bf800000000000000000000000"),
+      /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+      "fd71a97f000000000000000000000000"),
+      /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+      "fae352fe000000000000000000000000"),
+      /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+      "f5c6a5fc000000000000000000000000"),
+      /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+      "eb8d4bf8000000000000000000000000"),
+      /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+      "d71a97f0000000000000000000000000"),
+      /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+      "ae352fe0000000000000000000000000"),
+      /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+      "5c6a5fc0000000000000000000000000"),
+      /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+      "b8d4bf80000000000000000000000000"),
+      /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+      "71a97f00000000000000000000000000"),
+      /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+      "e352fe00000000000000000000000000"),
+      /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+      "c6a5fc00000000000000000000000000"),
+      /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+      "8d4bf800000000000000000000000000"),
+      /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+      "1a97f000000000000000000000000000"),
+      /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+      "352fe000000000000000000000000000"),
+      /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+      "6a5fc000000000000000000000000000"),
+      /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+      "d4bf8000000000000000000000000000"),
+      /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+      "a97f0000000000000000000000000000"),
+      /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+      "52fe0000000000000000000000000000"),
+      /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+      "a5fc0000000000000000000000000000"),
+      /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+      "4bf80000000000000000000000000000"),
+      /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+      "97f00000000000000000000000000000"),
+      /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+      "2fe00000000000000000000000000000"),
+      /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+      "5fc00000000000000000000000000000"),
+      /* [119] */ new RandomConstant("de000000000000000000000000000001",
+      "bf800000000000000000000000000000"),
+      /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+      "7f000000000000000000000000000000"),
+      /* [121] */ new RandomConstant("78000000000000000000000000000001",
+      "fe000000000000000000000000000000"),
+      /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+      "fc000000000000000000000000000000"),
+      /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+      "f8000000000000000000000000000000"),
+      /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+      "f0000000000000000000000000000000"),
+      /* [125] */ new RandomConstant("80000000000000000000000000000001",
+      "e0000000000000000000000000000000"),
+      /* [126] */ new RandomConstant("00000000000000000000000000000001",
+      "c0000000000000000000000000000000"),
+      /* [127] */ new RandomConstant("00000000000000000000000000000001",
+      "80000000000000000000000000000000")};
+
+  /**
+   *  generate the random number that is "advance" steps
+   *  from an initial random number of 0.  This is done by
+   *  starting with 0, and then advancing the by the
+   *  appropriate powers of 2 of the linear congruential
+   *  generator.
+   */
+  public static Unsigned16 skipAhead(Unsigned16 advance) {
+    Unsigned16 result = new Unsigned16();
+    long          bit_map;
+
+    bit_map = advance.getLow8();
+    for (int i = 0; bit_map != 0 && i < 64; i++) {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**i) (x)
+         */
+        result.multiply(genArray[i].a);
+        result.add(genArray[i].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    bit_map = advance.getHigh8();
+    for (int i = 0; bit_map != 0 && i < 64; i++)
+    {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**(i + 64)) (x)
+         */
+        result.multiply(genArray[i+64].a);
+        result.add(genArray[i+64].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Generate the next 16 byte random number.
+   */
+  public static void nextRand(Unsigned16 rand) {
+    /* advance the random number forward once using the linear congruential
+     * generator, and then return the new random number
+     */
+    rand.multiply(genArray[0].a);
+    rand.add(genArray[0].c);
+  }
+}
diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
new file mode 100644
index 0000000..ae3b99e
--- /dev/null
+++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.apps.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An unsigned 16 byte integer class that supports addition, multiplication,
+ * and left shifts.
+ *
+ *  * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a>
+ */
+class Unsigned16 implements Writable {
+  private long hi8;
+  private long lo8;
+
+  public Unsigned16() {
+    hi8 = 0;
+    lo8 = 0;
+  }
+
+  public Unsigned16(long l) {
+    hi8 = 0;
+    lo8 = l;
+  }
+
+  public Unsigned16(Unsigned16 other) {
+    hi8 = other.hi8;
+    lo8 = other.lo8;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Unsigned16) {
+      Unsigned16 other = (Unsigned16) o;
+      return other.hi8 == hi8 && other.lo8 == lo8;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) lo8;
+  }
+
+  /**
+   * Parse a hex string
+   * @param s the hex string
+   */
+  public Unsigned16(String s) throws NumberFormatException {
+    set(s);
+  }
+
+  /**
+   * Set the number from a hex string
+   * @param s the number in hexadecimal
+   * @throws NumberFormatException if the number is invalid
+   */
+  public void set(String s) throws NumberFormatException {
+    hi8 = 0;
+    lo8 = 0;
+    final long lastDigit = 0xfl << 60;
+    for (int i = 0; i < s.length(); ++i) {
+      int digit = getHexDigit(s.charAt(i));
+      if ((lastDigit & hi8) != 0) {
+        throw new NumberFormatException(s + " overflowed 16 bytes");
+      }
+      hi8 <<= 4;
+      hi8 |= (lo8 & lastDigit) >>> 60;
+      lo8 <<= 4;
+      lo8 |= digit;
+    }
+  }
+
+  /**
+   * Set the number to a given long.
+   * @param l the new value, which is treated as an unsigned number
+   */
+  public void set(long l) {
+    lo8 = l;
+    hi8 = 0;
+  }
+
+  /**
+   * Map a hexadecimal character into a digit.
+   * @param ch the character
+   * @return the digit from 0 to 15
+   * @throws NumberFormatException
+   */
+  private static int getHexDigit(char ch) throws NumberFormatException {
+    if (ch >= '0' && ch <= '9') {
+      return ch - '0';
+    }
+    if (ch >= 'a' && ch <= 'f') {
+      return ch - 'a' + 10;
+    }
+    if (ch >= 'A' && ch <= 'F') {
+      return ch - 'A' + 10;
+    }
+    throw new NumberFormatException(ch + " is not a valid hex digit");
+  }
+
+  private static final Unsigned16 TEN = new Unsigned16(10);
+
+  public static Unsigned16 fromDecimal(String s) throws NumberFormatException {
+    Unsigned16 result = new Unsigned16();
+    Unsigned16 tmp = new Unsigned16();
+    for(int i=0; i < s.length(); i++) {
+      char ch = s.charAt(i);
+      if (ch < '0' || ch > '9') {
+        throw new NumberFormatException(ch + " not a valid decimal digit");
+      }
+      int digit = ch - '0';
+      result.multiply(TEN);
+      tmp.set(digit);
+      result.add(tmp);
+    }
+    return result;
+  }
+
+  /**
+   * Return the number as a hex string.
+   */
+  public String toString() {
+    if (hi8 == 0) {
+      return Long.toHexString(lo8);
+    } else {
+      StringBuilder result = new StringBuilder();
+      result.append(Long.toHexString(hi8));
+      String loString = Long.toHexString(lo8);
+      for(int i=loString.length(); i < 16; ++i) {
+        result.append('0');
+      }
+      result.append(loString);
+      return result.toString();
+    }
+  }
+
+  /**
+   * Get a given byte from the number.
+   * @param b the byte to get with 0 meaning the most significant byte
+   * @return the byte or 0 if b is outside of 0..15
+   */
+  public byte getByte(int b) {
+    if (b >= 0 && b < 16) {
+      if (b < 8) {
+        return (byte) (hi8 >> (56 - 8*b));
+      } else {
+        return (byte) (lo8 >> (120 - 8*b));
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Get the hexadecimal digit at the given position.
+   * @param p the digit position to get with 0 meaning the most significant
+   * @return the character or '0' if p is outside of 0..31
+   */
+  public char getHexDigit(int p) {
+    byte digit = getByte(p / 2);
+    if (p % 2 == 0) {
+      digit >>>= 4;
+    }
+    digit &= 0xf;
+    if (digit < 10) {
+      return (char) ('0' + digit);
+    } else {
+      return (char) ('A' + digit - 10);
+    }
+  }
+
+  /**
+   * Get the high 8 bytes as a long.
+   */
+  public long getHigh8() {
+    return hi8;
+  }
+
+  /**
+   * Get the low 8 bytes as a long.
+   */
+  public long getLow8() {
+    return lo8;
+  }
+
+  /**
+   * Multiple the current number by a 16 byte unsigned integer. Overflow is not
+   * detected and the result is the low 16 bytes of the result. The numbers 
+   * are divided into 32 and 31 bit chunks so that the product of two chucks
+   * fits in the unsigned 63 bits of a long.
+   * @param b the other number
+   */
+  void multiply(Unsigned16 b) {
+    // divide the left into 4 32 bit chunks
+    long[] left = new long[4];
+    left[0] = lo8 & 0xffffffffl;
+    left[1] = lo8 >>> 32;
+    left[2] = hi8 & 0xffffffffl;
+    left[3] = hi8 >>> 32;
+    // divide the right into 5 31 bit chunks
+    long[] right = new long[5];
+    right[0] = b.lo8 & 0x7fffffffl;
+    right[1] = (b.lo8 >>> 31) & 0x7fffffffl;
+    right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2);
+    right[3] = (b.hi8 >>> 29) & 0x7fffffffl;
+    right[4] = (b.hi8 >>> 60);
+    // clear the cur value
+    set(0);
+    Unsigned16 tmp = new Unsigned16();
+    for(int l=0; l < 4; ++l) {
+      for (int r=0; r < 5; ++r) {
+        long prod = left[l] * right[r];
+        if (prod != 0) {
+          int off = l*32 + r*31;
+          tmp.set(prod);
+          tmp.shiftLeft(off);
+          add(tmp);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the given number into the current number.
+   * @param b the other number
+   */
+  public void add(Unsigned16 b) {
+    long sumHi;
+    long sumLo;
+    long  reshibit, hibit0, hibit1;
+
+    sumHi = hi8 + b.hi8;
+
+    hibit0 = (lo8 & 0x8000000000000000L);
+    hibit1 = (b.lo8 & 0x8000000000000000L);
+    sumLo = lo8 + b.lo8;
+    reshibit = (sumLo & 0x8000000000000000L);
+    if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0))
+      sumHi++;  /* add carry bit */
+    hi8 = sumHi;
+    lo8 = sumLo;
+  }
+
+  /**
+   * Shift the number a given number of bit positions. The number is the low
+   * order bits of the result.
+   * @param bits the bit positions to shift by
+   */
+  public void shiftLeft(int bits) {
+    if (bits != 0) {
+      if (bits < 64) {
+        hi8 <<= bits;
+        hi8 |= (lo8 >>> (64 - bits));
+        lo8 <<= bits;
+      } else if (bits < 128) {
+        hi8 = lo8 << (bits - 64);
+        lo8 = 0;
+      } else {
+        hi8 = 0;
+        lo8 = 0;
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    hi8 = in.readLong();
+    lo8 = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(hi8);
+    out.writeLong(lo8);
+  }
+
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
new file mode 100644
index 0000000..f86a86e
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
@@ -0,0 +1,138 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper}
+import org.apache.wayang.apps.wordcount.WordCountScala
+import org.apache.wayang.core.api.Configuration
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.util.fs.FileSystems
+
+import java.util.regex.Pattern
+import scala.util.matching.Regex
+
+object TeraApp extends ExperimentDescriptor {
+
+  val KEY_LEN = 10
+  val VALUE_LEN = 100
+  val RECORD_LEN : Int = KEY_LEN + VALUE_LEN
+
+  override def version = "0.1.0"
+
+  def main(args: Array[String]) {
+    // Parse args.
+    if (args.isEmpty) {
+      println(s"Usage: " +
+        s"${Parameters.experimentHelp} " +
+        s"<plugin(,plugin)*> " +
+        s"<task could be[generate|sort|validate]> " +
+        s"<file size ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T])> " +
+        s"<partitions>" +
+        s"<input file if not value is null> " +
+        s"<output file if not value is null>")
+      sys.exit(1)
+    }
+    implicit val configuration = new Configuration
+    implicit val experiment = Parameters.createExperiment(args(0), this)
+    val plugins = Parameters.loadPlugins(args(1))
+    experiment.getSubject.addConfiguration("plugins", args(1))
+    val task = args(2)
+    experiment.getSubject.addConfiguration("task", task)
+    val fileSize = sizeStrToBytes(args(3))
+    experiment.getSubject.addConfiguration("fileSize", fileSize)
+    val partitions = args(4).toInt
+    experiment.getSubject.addConfiguration("partitions", partitions)
+    val input_file = if(args(5).equals("null")) null else args(5)
+    val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null
+    experiment.getSubject.addConfiguration("inputFile", input_file)
+    experiment.getSubject.addConfiguration("outputFile", output_file)
+
+    task match {
+      case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions)
+      case "sort" => null
+      case "validate" => null
+    }
+
+
+    // Run wordCount.
+//    val wordCount = new WordCountScala(plugins: _*)
+//    val words =
+//      (if (wordsPerLine != null) {
+//        wordCount(inputFile, wordsPerLine)
+//      } else {
+//        wordCount(inputFile)
+//      }).toSeq.sortBy(-_._2)
+//
+//    // Store experiment data.
+//    val inputFileSize = FileSystems.getFileSize(inputFile)
+//    if (inputFileSize.isPresent) experiment.getSubject.addConfiguration("inputSize", inputFileSize.getAsLong)
+//    ProfileDBHelper.store(experiment, configuration)
+//
+//    // Print results.
+//    println(s"Found ${words.size} words:")
+//    words.take(10).foreach(wc => println(s"${wc._2}x ${wc._1}"))
+//    if (words.size > 10) print(s"${words.size - 10} more...")
+
+
+  }
+
+  /**
+   * Convert the string format ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T]) to the
+   * number on bytes
+   *
+   * B = Bytes
+   * k|K = Kilobytes (1_024 Bytes)
+   * m|M = Megabytes (1_048_576 Bytes)
+   * g|G = Gigabytes (1_073_741_824 Bytes)
+   * t|T = Terabytes (1_099_511_627_776 Bytes)
+   *
+   * @param str in the format
+   * @return number equivalent to the byte
+   */
+  def sizeStrToBytes(str: String): Long = {
+    val reg = "(\\d+(\\.\\d+)?)([B|k|K|m|M|g|G|t|T])"
+    val groups = Pattern.compile(reg).matcher(str)
+    groups.find()
+
+    val number_part:Double = groups.group(1).toDouble
+    val letter_part:String = groups.group(3)
+
+    val conversion = letter_part match {
+      case "B" => 1L //2^0
+      case "k" => 1024L //2^10
+      case "K" => 1024L //2^10
+      case "m" => 1048576L //2^20
+      case "M" => 1048576L //2^20
+      case "g" => 1073741824L //2^30
+      case "G" => 1073741824L //2^30
+      case "t" => 1099511627776L //2^40
+      case "T" => 1099511627776L //2^40
+      case _ => 1L //2^0
+    }
+    (number_part * conversion).toLong
+  }
+
+  /**
+   * take a number that represent a size on bytes return the human readable version
+   *
+   * @param size number that represent the size
+   * @return human readable version of the size
+   */
+  def sizeToSizeStr(size: Long): String = {
+    val kbScale: Long = 1024L
+    val mbScale: Long = 1024L * kbScale
+    val gbScale: Long = 1024L * mbScale
+    val tbScale: Long = 1024L * gbScale
+
+    if (size > tbScale) {
+      size / tbScale + "TB"
+    } else if (size > gbScale) {
+      size / gbScale  + "GB"
+    } else if (size > mbScale) {
+      size / mbScale + "MB"
+    } else if (size > kbScale) {
+      size / kbScale + "KB"
+    } else {
+      size + "B"
+    }
+  }
+
+}
diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala
new file mode 100644
index 0000000..89ac49d
--- /dev/null
+++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala
@@ -0,0 +1,111 @@
+package org.apache.wayang.apps.terasort
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.commons.util.profiledb.model.Experiment
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+
+class TeraGen(@transient plugins: Plugin*) extends Serializable {
+
+  def apply(output_url: String, file_size: Long, partitions: Int)
+           (implicit configuration: Configuration, experiment: Experiment) = {
+    val wayangCtx = new WayangContext(configuration)
+    plugins.foreach(wayangCtx.register)
+    val planBuilder = new PlanBuilder(wayangCtx)
+
+    val parts = partitions
+    val recordsPerPartition = file_size / TeraApp.VALUE_LEN / parts.toLong
+    val numRecords = recordsPerPartition * parts.toLong
+
+    assert(recordsPerPartition < Int.MaxValue, s"records per partition > ${Int.MaxValue}")
+
+    println("===========================================================================")
+    println("===========================================================================")
+    println(s"Input size: $file_size")
+    println(s"Total number of records: $numRecords")
+    println(s"Number of output partitions: $parts")
+    println("Number of records/output partition: " + (numRecords / parts))
+    println("===========================================================================")
+    println("===========================================================================")
+
+    planBuilder
+      .withJobName(s"Terasort generate ${file_size}")
+      .withExperiment(experiment)
+      .withUdfJarsOf(this.getClass)
+      .loadCollection(1 to parts)
+      .flatMap( index => {
+        val one = new Unsigned16(1)
+        val firstRecordNumber = new Unsigned16(index.toLong * recordsPerPartition.toLong)
+        val recordsToGenerate = new Unsigned16(recordsPerPartition)
+
+        val recordNumber = new Unsigned16(firstRecordNumber)
+        val lastRecordNumber = new Unsigned16(firstRecordNumber)
+        lastRecordNumber.add(recordsToGenerate)
+
+        val rand = Random16.skipAhead(firstRecordNumber)
+
+        Iterator.tabulate(recordsPerPartition.toInt) { offset =>
+          val rowBytes: Array[Byte] = new Array[Byte](TeraApp.RECORD_LEN)
+          val key = new Array[Byte](TeraApp.KEY_LEN)
+          val value = new Array[Byte](TeraApp.VALUE_LEN)
+          Random16.nextRand(rand)
+          generateRecord(rowBytes, rand, recordNumber)
+          recordNumber.add(one)
+          rowBytes.copyToArray(key, 0, TeraApp.KEY_LEN)
+          rowBytes.takeRight(TeraApp.VALUE_LEN).copyToArray(value, 0, TeraApp.VALUE_LEN)
+          (key, value)
+        }.toStream
+      })
+      .writeObjectFile(output_url)
+  }
+
+  /**
+   * Generate a binary record suitable for all sort benchmarks except PennySort.
+   *
+   * @param recBuf record to return
+   */
+  def generateRecord(recBuf: Array[Byte], rand: Unsigned16, recordNumber: Unsigned16): Unit = {
+    // Generate the 10-byte key using the high 10 bytes of the 128-bit random number
+    var i = 0
+    while (i < 10) {
+      recBuf(i) = rand.getByte(i)
+      i += 1
+    }
+
+    // Add 2 bytes of "break"
+    recBuf(10) = 0x00.toByte
+    recBuf(11) = 0x11.toByte
+
+    // Convert the 128-bit record number to 32 bits of ascii hexadecimal
+    // as the next 32 bytes of the record.
+    i = 0
+    while (i < 32) {
+      recBuf(12 + i) = recordNumber.getHexDigit(i).toByte
+      i += 1
+    }
+
+    // Add 4 bytes of "break" data
+    recBuf(44) = 0x88.toByte
+    recBuf(45) = 0x99.toByte
+    recBuf(46) = 0xAA.toByte
+    recBuf(47) = 0xBB.toByte
+
+    // Add 48 bytes of filler based on low 48 bits of random number
+    i = 0
+    while (i < 12) {
+      val v = rand.getHexDigit(20 + i).toByte
+      recBuf(48 + i * 4) = v
+      recBuf(49 + i * 4) = v
+      recBuf(50 + i * 4) = v
+      recBuf(51 + i * 4) = v
+      i += 1
+    }
+
+    // Add 4 bytes of "break" data
+    recBuf(96) = 0xCC.toByte
+    recBuf(97) = 0xDD.toByte
+    recBuf(98) = 0xEE.toByte
+    recBuf(99) = 0xFF.toByte
+  }
+
+}