You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:59 UTC
[14/25] git commit: TEZ-1578. Remove TeraSort from Tez codebase.
(hitesh)
TEZ-1578. Remove TeraSort from Tez codebase. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8e382b34
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8e382b34
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8e382b34
Branch: refs/heads/TEZ-8
Commit: 8e382b34d9487faea56ac0a8a31c53745aacdc3d
Parents: b62298e
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 12 11:11:47 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 12 11:11:47 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../tez/mapreduce/examples/ExampleDriver.java | 9 -
.../mapreduce/examples/terasort/GenSort.java | 251 -------------
.../mapreduce/examples/terasort/Random16.java | 374 -------------------
.../examples/terasort/TeraChecksum.java | 103 -----
.../mapreduce/examples/terasort/TeraGen.java | 311 ---------------
.../examples/terasort/TeraInputFormat.java | 353 -----------------
.../examples/terasort/TeraOutputFormat.java | 112 ------
.../examples/terasort/TeraScheduler.java | 253 -------------
.../mapreduce/examples/terasort/TeraSort.java | 335 -----------------
.../examples/terasort/TeraValidate.java | 188 ----------
.../mapreduce/examples/terasort/Unsigned16.java | 297 ---------------
.../examples/terasort/TestTeraSort.java | 94 -----
13 files changed, 3 insertions(+), 2680 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff53e04..87729b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
+ TEZ-1578. Remove TeraSort from Tez codebase.
ALL CHANGES:
TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
@@ -12,6 +13,8 @@ ALL CHANGES:
TEZ-850. Recovery unit tests.
TEZ-853. Support counters recovery.
TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
+ TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
+ TEZ-1578. Remove TeraSort from Tez codebase.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 977b767..3824607 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -31,9 +31,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.mapreduce.examples.terasort.TeraGen;
-import org.apache.tez.mapreduce.examples.terasort.TeraSort;
-import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
/**
* A description of an example program based on its class and a
@@ -65,12 +62,6 @@ public class ExampleDriver {
pgd.addClass("join", Join.class,
"A job that effects a join over sorted, equally partitioned"
+ " datasets");
- pgd.addClass("teragen", TeraGen.class,
- "Generate data for the terasort");
- pgd.addClass("terasort", TeraSort.class,
- "Run the terasort");
- pgd.addClass("teravalidate", TeraValidate.class,
- "Checking results of terasort");
pgd.addClass("groupbyorderbymrrtest", GroupByOrderByMRRTest.class,
"A map-reduce-reduce program that does groupby-order by. Takes input"
+ " containing employee_name department name per line of input"
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
deleted file mode 100644
index c8517e5..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.math.BigInteger;
-import java.util.zip.Checksum;
-
-import org.apache.hadoop.util.PureJavaCrc32;
-
-/**
- * A single process data generator for the terasort data. Based on gensort.c
- * version 1.1 (3 Mar 2009) from Chris Nyberg <ch...@ordinal.com>.
- */
-public class GenSort {
-
- /**
- * Generate a "binary" record suitable for all sort benchmarks *except*
- * PennySort.
- */
- static void generateRecord(byte[] recBuf, Unsigned16 rand,
- Unsigned16 recordNumber) {
- /* generate the 10-byte key using the high 10 bytes of the 128-bit
- * random number
- */
- for(int i=0; i < 10; ++i) {
- recBuf[i] = rand.getByte(i);
- }
-
- /* add 2 bytes of "break" */
- recBuf[10] = 0x00;
- recBuf[11] = 0x11;
-
- /* convert the 128-bit record number to 32 bits of ascii hexadecimal
- * as the next 32 bytes of the record.
- */
- for (int i = 0; i < 32; i++) {
- recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
- }
-
- /* add 4 bytes of "break" data */
- recBuf[44] = (byte) 0x88;
- recBuf[45] = (byte) 0x99;
- recBuf[46] = (byte) 0xAA;
- recBuf[47] = (byte) 0xBB;
-
- /* add 48 bytes of filler based on low 48 bits of random number */
- for(int i=0; i < 12; ++i) {
- recBuf[48+i*4] = recBuf[49+i*4] = recBuf[50+i*4] = recBuf[51+i*4] =
- (byte) rand.getHexDigit(20 + i);
- }
-
- /* add 4 bytes of "break" data */
- recBuf[96] = (byte) 0xCC;
- recBuf[97] = (byte) 0xDD;
- recBuf[98] = (byte) 0xEE;
- recBuf[99] = (byte) 0xFF;
- }
-
-
- private static BigInteger makeBigInteger(long x) {
- byte[] data = new byte[8];
- for(int i=0; i < 8; ++i) {
- data[i] = (byte) (x >>> (56 - 8*i));
- }
- return new BigInteger(1, data);
- }
-
- private static final BigInteger NINETY_FIVE = new BigInteger("95");
-
- /**
- * Generate an ascii record suitable for all sort benchmarks including
- * PennySort.
- */
- static void generateAsciiRecord(byte[] recBuf, Unsigned16 rand,
- Unsigned16 recordNumber) {
-
- /* generate the 10-byte ascii key using mostly the high 64 bits.
- */
- long temp = rand.getHigh8();
- if (temp < 0) {
- // use biginteger to avoid the negative sign problem
- BigInteger bigTemp = makeBigInteger(temp);
- recBuf[0] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
- temp = bigTemp.divide(NINETY_FIVE).longValue();
- } else {
- recBuf[0] = (byte) (' ' + (temp % 95));
- temp /= 95;
- }
- for(int i=1; i < 8; ++i) {
- recBuf[i] = (byte) (' ' + (temp % 95));
- temp /= 95;
- }
- temp = rand.getLow8();
- if (temp < 0) {
- BigInteger bigTemp = makeBigInteger(temp);
- recBuf[8] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
- temp = bigTemp.divide(NINETY_FIVE).longValue();
- } else {
- recBuf[8] = (byte) (' ' + (temp % 95));
- temp /= 95;
- }
- recBuf[9] = (byte)(' ' + (temp % 95));
-
- /* add 2 bytes of "break" */
- recBuf[10] = ' ';
- recBuf[11] = ' ';
-
- /* convert the 128-bit record number to 32 bits of ascii hexadecimal
- * as the next 32 bytes of the record.
- */
- for (int i = 0; i < 32; i++) {
- recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
- }
-
- /* add 2 bytes of "break" data */
- recBuf[44] = ' ';
- recBuf[45] = ' ';
-
- /* add 52 bytes of filler based on low 48 bits of random number */
- for(int i=0; i < 13; ++i) {
- recBuf[46+i*4] = recBuf[47+i*4] = recBuf[48+i*4] = recBuf[49+i*4] =
- (byte) rand.getHexDigit(19 + i);
- }
-
- /* add 2 bytes of "break" data */
- recBuf[98] = '\r'; /* nice for Windows */
- recBuf[99] = '\n';
-}
-
-
- private static void usage() {
- PrintStream out = System.out;
- out.println("usage: gensort [-a] [-c] [-bSTARTING_REC_NUM] NUM_RECS FILE_NAME");
- out.println("-a Generate ascii records required for PennySort or JouleSort.");
- out.println(" These records are also an alternative input for the other");
- out.println(" sort benchmarks. Without this flag, binary records will be");
- out.println(" generated that contain the highest density of randomness in");
- out.println(" the 10-byte key.");
- out.println( "-c Calculate the sum of the crc32 checksums of each of the");
- out.println(" generated records and send it to standard error.");
- out.println("-bN Set the beginning record generated to N. By default the");
- out.println(" first record generated is record 0.");
- out.println("NUM_RECS The number of sequential records to generate.");
- out.println("FILE_NAME The name of the file to write the records to.\n");
- out.println("Example 1 - to generate 1000000 ascii records starting at record 0 to");
- out.println("the file named \"pennyinput\":");
- out.println(" gensort -a 1000000 pennyinput\n");
- out.println("Example 2 - to generate 1000 binary records beginning with record 2000");
- out.println("to the file named \"partition2\":");
- out.println(" gensort -b2000 1000 partition2");
- System.exit(1);
- }
-
-
- public static void outputRecords(OutputStream out,
- boolean useAscii,
- Unsigned16 firstRecordNumber,
- Unsigned16 recordsToGenerate,
- Unsigned16 checksum
- ) throws IOException {
- byte[] row = new byte[100];
- Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
- Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
- Checksum crc = new PureJavaCrc32();
- Unsigned16 tmp = new Unsigned16();
- lastRecordNumber.add(recordsToGenerate);
- Unsigned16 ONE = new Unsigned16(1);
- Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
- while (!recordNumber.equals(lastRecordNumber)) {
- Random16.nextRand(rand);
- if (useAscii) {
- generateAsciiRecord(row, rand, recordNumber);
- } else {
- generateRecord(row, rand, recordNumber);
- }
- if (checksum != null) {
- crc.reset();
- crc.update(row, 0, row.length);
- tmp.set(crc.getValue());
- checksum.add(tmp);
- }
- recordNumber.add(ONE);
- out.write(row);
- }
- }
-
- public static void main(String[] args) throws Exception {
- Unsigned16 startingRecord = new Unsigned16();
- Unsigned16 numberOfRecords;
- OutputStream out;
- boolean useAscii = false;
- Unsigned16 checksum = null;
-
- int i;
- for(i=0; i < args.length; ++i) {
- String arg = args[i];
- int argLength = arg.length();
- if (argLength >= 1 && arg.charAt(0) == '-') {
- if (argLength < 2) {
- usage();
- }
- switch (arg.charAt(1)) {
- case 'a':
- useAscii = true;
- break;
- case 'b':
- startingRecord = Unsigned16.fromDecimal(arg.substring(2));
- break;
- case 'c':
- checksum = new Unsigned16();
- break;
- default:
- usage();
- }
- } else {
- break;
- }
- }
- if (args.length - i != 2) {
- usage();
- }
- numberOfRecords = Unsigned16.fromDecimal(args[i]);
- out = new FileOutputStream(args[i+1]);
-
- outputRecords(out, useAscii, startingRecord, numberOfRecords, checksum);
- out.close();
- if (checksum != null) {
- System.out.println(checksum);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
deleted file mode 100644
index 31cbd48..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.examples.terasort;
-
-/**
- * This class implements a 128-bit linear congruential generator.
- * Specifically, if X0 is the most recently issued 128-bit random
- * number (or a seed of 0 if no random number has already been generated,
- * the next number to be generated, X1, is equal to:
- * X1 = (a * X0 + c) mod 2**128
- * where a is 47026247687942121848144207491837523525
- * or 0x2360ed051fc65da44385df649fccf645
- * and c is 98910279301475397889117759788405497857
- * or 0x4a696d47726179524950202020202001
- * The coefficient "a" is suggested by:
- * Pierre L'Ecuyer, "Tables of linear congruential generators of different
- * sizes and good lattice structure", Mathematics of Computation, 68
- * pp. 249 - 260 (1999)
- * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
- * The constant "c" meets the simple suggestion by the same reference that
- * it be odd.
- *
- * There is also a facility for quickly advancing the state of the
- * generator by a fixed number of steps - this facilitates parallel
- * generation.
- *
- * This is based on 1.0 of rand16.c from Chris Nyberg
- * <ch...@ordinal.com>.
- */
-class Random16 {
-
- /**
- * The "Gen" array contain powers of 2 of the linear congruential generator.
- * The index 0 struct contain the "a" coefficient and "c" constant for the
- * generator. That is, the generator is:
- * f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
- *
- * All structs after the first contain an "a" and "c" that
- * comprise the square of the previous function.
- *
- * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
- * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
- * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
- * ...
-
- */
- private static class RandomConstant {
- final Unsigned16 a;
- final Unsigned16 c;
- public RandomConstant(String left, String right) {
- a = new Unsigned16(left);
- c = new Unsigned16(right);
- }
- }
-
- private static final RandomConstant[] genArray = new RandomConstant[]{
- /* [ 0] */ new RandomConstant("2360ed051fc65da44385df649fccf645",
- "4a696d47726179524950202020202001"),
- /* [ 1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99",
- "95e0e48262b3edfe04479485c755b646"),
- /* [ 2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771",
- "882a02c315362b60765f100068b33a1c"),
- /* [ 3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1",
- "5efc4abfaca23e8ca8edb1f2dfbf6478"),
- /* [ 4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1",
- "f25bd15439d16af594c1b1bafa6239f0"),
- /* [ 5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781",
- "89ca67c29c9397d59c612596145db7e0"),
- /* [ 6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01",
- "8b6ae036713bd578a8093c8eae5c7fc0"),
- /* [ 7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01",
- "98a2542fd23d0dbdff3b886cdb1d3f80"),
- /* [ 8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01",
- "954db923fdb7933e947cd1edcecb7f00"),
- /* [ 9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801",
- "00be4a36657c98cd204e8c8af7dafe00"),
- /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001",
- "991965329dccb28d581199ab18c5fc00"),
- /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001",
- "e1a8705b63ad5b8cd6c3d268d5cbf800"),
- /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001",
- "2b657bbfd6ed9d632079e70c3c97f000"),
- /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
- "59b60ee4c52fa49e9fe90682bd2fe000"),
- /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001",
- "cc099c88030679464fe86aae8a5fc000"),
- /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001",
- "06b9abff9f9f33dd30362c0154bf8000"),
- /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001",
- "e296707121688d5a0260b293a97f0000"),
- /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001",
- "189ffc4701ff23cb8f8acf6b52fe0000"),
- /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001",
- "5141110ab208fb9d61fb47e6a5fc0000"),
- /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001",
- "3c97caa62540f2948d8d340d4bf80000"),
- /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001",
- "1b25cb9cfe5a0c963174f91a97f00000"),
- /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001",
- "0c644570b4a487103c5436352fe00000"),
- /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001",
- "3d0589c28869472bde517c6a5fc00000"),
- /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001",
- "bc95e5ab36477e65534738d4bf800000"),
- /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001",
- "ddb02ff72a031c01011f71a97f000000"),
- /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001",
- "2561426086d9acdb6c82e352fe000000"),
- /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001",
- "64a788e3c118ed1c8215c6a5fc000000"),
- /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001",
- "e65ea321908627cfa86b8d4bf8000000"),
- /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001",
- "53d27225604d85f9e1d71a97f0000000"),
- /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001",
- "ca5ec7a3ed1fe55e07ae352fe0000000"),
- /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001",
- "4daebb2e085330651f5c6a5fc0000000"),
- /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001",
- "9d6f1a00a8f3f76e7eb8d4bf80000000"),
- /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001",
- "158c62f2b31e496dfd71a97f00000000"),
- /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001",
- "290e84a2eb15fd1ffae352fe00000000"),
- /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001",
- "e3dc1bfbe991a34ff5c6a5fc00000000"),
- /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001",
- "ddf540d020b9eadfeb8d4bf800000000"),
- /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001",
- "8ee4950177ce66bfd71a97f000000000"),
- /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001",
- "39e0f787c907117fae352fe000000000"),
- /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001",
- "659d2522f7b732ff5c6a5fc000000000"),
- /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001",
- "9e8722938612a5feb8d4bf8000000000"),
- /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001",
- "e941a65d66b64bfd71a97f0000000000"),
- /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001",
- "7b50d19437b097fae352fe0000000000"),
- /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001",
- "59d7b68e18712ff5c6a5fc0000000000"),
- /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001",
- "4087bab2d5225feb8d4bf80000000000"),
- /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001",
- "b470abc03b44bfd71a97f00000000000"),
- /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001",
- "366630eaba897fae352fe00000000000"),
- /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001",
- "a2dfc77e8512ff5c6a5fc00000000000"),
- /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001",
- "1e0d25a14a25feb8d4bf800000000000"),
- /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001",
- "9d50a5d3944bfd71a97f000000000000"),
- /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001",
- "bf7ab5eb2897fae352fe000000000000"),
- /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001",
- "925b14e6512ff5c6a5fc000000000000"),
- /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001",
- "724cce0ca25feb8d4bf8000000000000"),
- /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001",
- "1af42d1944bfd71a97f0000000000000"),
- /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001",
- "0f529e32897fae352fe0000000000000"),
- /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001",
- "844e4c6512ff5c6a5fc0000000000000"),
- /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
- "9f40d8ca25feb8d4bf80000000000000"),
- /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
- "9912b1944bfd71a97f00000000000000"),
- /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
- "9c69632897fae352fe00000000000000"),
- /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
- "e1e2c6512ff5c6a5fc00000000000000"),
- /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
- "68058ca25feb8d4bf800000000000000"),
- /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
- "610b1944bfd71a97f000000000000000"),
- /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
- "061632897fae352fe000000000000000"),
- /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
- "1c2c6512ff5c6a5fc000000000000000"),
- /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
- "7858ca25feb8d4bf8000000000000000"),
- /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
- "f0b1944bfd71a97f0000000000000000"),
- /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
- "e1632897fae352fe0000000000000000"),
- /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
- "c2c6512ff5c6a5fc0000000000000000"),
- /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
- "858ca25feb8d4bf80000000000000000"),
- /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
- "0b1944bfd71a97f00000000000000000"),
- /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
- "1632897fae352fe00000000000000000"),
- /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
- "2c6512ff5c6a5fc00000000000000000"),
- /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
- "58ca25feb8d4bf800000000000000000"),
- /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
- "b1944bfd71a97f000000000000000000"),
- /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
- "632897fae352fe000000000000000000"),
- /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
- "c6512ff5c6a5fc000000000000000000"),
- /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
- "8ca25feb8d4bf8000000000000000000"),
- /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
- "1944bfd71a97f0000000000000000000"),
- /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
- "32897fae352fe0000000000000000000"),
- /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
- "6512ff5c6a5fc0000000000000000000"),
- /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
- "ca25feb8d4bf80000000000000000000"),
- /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
- "944bfd71a97f00000000000000000000"),
- /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
- "2897fae352fe00000000000000000000"),
- /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
- "512ff5c6a5fc00000000000000000000"),
- /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
- "a25feb8d4bf800000000000000000000"),
- /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
- "44bfd71a97f000000000000000000000"),
- /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
- "897fae352fe000000000000000000000"),
- /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
- "12ff5c6a5fc000000000000000000000"),
- /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
- "25feb8d4bf8000000000000000000000"),
- /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
- "4bfd71a97f0000000000000000000000"),
- /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
- "97fae352fe0000000000000000000000"),
- /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
- "2ff5c6a5fc0000000000000000000000"),
- /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
- "5feb8d4bf80000000000000000000000"),
- /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
- "bfd71a97f00000000000000000000000"),
- /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
- "7fae352fe00000000000000000000000"),
- /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
- "ff5c6a5fc00000000000000000000000"),
- /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
- "feb8d4bf800000000000000000000000"),
- /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
- "fd71a97f000000000000000000000000"),
- /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
- "fae352fe000000000000000000000000"),
- /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
- "f5c6a5fc000000000000000000000000"),
- /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
- "eb8d4bf8000000000000000000000000"),
- /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
- "d71a97f0000000000000000000000000"),
- /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
- "ae352fe0000000000000000000000000"),
- /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
- "5c6a5fc0000000000000000000000000"),
- /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
- "b8d4bf80000000000000000000000000"),
- /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
- "71a97f00000000000000000000000000"),
- /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
- "e352fe00000000000000000000000000"),
- /* [106] */ new RandomConstant("363ef000000000000000000000000001",
- "c6a5fc00000000000000000000000000"),
- /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
- "8d4bf800000000000000000000000000"),
- /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
- "1a97f000000000000000000000000000"),
- /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
- "352fe000000000000000000000000000"),
- /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
- "6a5fc000000000000000000000000000"),
- /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
- "d4bf8000000000000000000000000000"),
- /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
- "a97f0000000000000000000000000000"),
- /* [113] */ new RandomConstant("1f780000000000000000000000000001",
- "52fe0000000000000000000000000000"),
- /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
- "a5fc0000000000000000000000000000"),
- /* [115] */ new RandomConstant("7de00000000000000000000000000001",
- "4bf80000000000000000000000000000"),
- /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
- "97f00000000000000000000000000000"),
- /* [117] */ new RandomConstant("f7800000000000000000000000000001",
- "2fe00000000000000000000000000000"),
- /* [118] */ new RandomConstant("ef000000000000000000000000000001",
- "5fc00000000000000000000000000000"),
- /* [119] */ new RandomConstant("de000000000000000000000000000001",
- "bf800000000000000000000000000000"),
- /* [120] */ new RandomConstant("bc000000000000000000000000000001",
- "7f000000000000000000000000000000"),
- /* [121] */ new RandomConstant("78000000000000000000000000000001",
- "fe000000000000000000000000000000"),
- /* [122] */ new RandomConstant("f0000000000000000000000000000001",
- "fc000000000000000000000000000000"),
- /* [123] */ new RandomConstant("e0000000000000000000000000000001",
- "f8000000000000000000000000000000"),
- /* [124] */ new RandomConstant("c0000000000000000000000000000001",
- "f0000000000000000000000000000000"),
- /* [125] */ new RandomConstant("80000000000000000000000000000001",
- "e0000000000000000000000000000000"),
- /* [126] */ new RandomConstant("00000000000000000000000000000001",
- "c0000000000000000000000000000000"),
- /* [127] */ new RandomConstant("00000000000000000000000000000001",
- "80000000000000000000000000000000")};
-
- /**
- * generate the random number that is "advance" steps
- * from an initial random number of 0. This is done by
- * starting with 0, and then advancing the by the
- * appropriate powers of 2 of the linear congruential
- * generator.
- */
- public static Unsigned16 skipAhead(Unsigned16 advance) {
- Unsigned16 result = new Unsigned16();
- long bit_map;
-
- bit_map = advance.getLow8();
- for (int i = 0; bit_map != 0 && i < 64; i++) {
- if ((bit_map & (1L << i)) != 0) {
- /* advance random number by f**(2**i) (x)
- */
- result.multiply(genArray[i].a);
- result.add(genArray[i].c);
- bit_map &= ~(1L << i);
- }
- }
- bit_map = advance.getHigh8();
- for (int i = 0; bit_map != 0 && i < 64; i++)
- {
- if ((bit_map & (1L << i)) != 0) {
- /* advance random number by f**(2**(i + 64)) (x)
- */
- result.multiply(genArray[i+64].a);
- result.add(genArray[i+64].c);
- bit_map &= ~(1L << i);
- }
- }
- return result;
- }
-
- /**
- * Generate the next 16 byte random number.
- */
- public static void nextRand(Unsigned16 rand) {
- /* advance the random number forward once using the linear congruential
- * generator, and then return the new random number
- */
- rand.multiply(genArray[0].a);
- rand.add(genArray[0].c);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
deleted file mode 100644
index a5b408b..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.IOException;
-import java.util.zip.Checksum;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.PureJavaCrc32;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class TeraChecksum extends Configured implements Tool {
- static class ChecksumMapper
- extends Mapper<Text, Text, NullWritable, Unsigned16> {
- private Unsigned16 checksum = new Unsigned16();
- private Unsigned16 sum = new Unsigned16();
- private Checksum crc32 = new PureJavaCrc32();
-
- public void map(Text key, Text value,
- Context context) throws IOException {
- crc32.reset();
- crc32.update(key.getBytes(), 0, key.getLength());
- crc32.update(value.getBytes(), 0, value.getLength());
- checksum.set(crc32.getValue());
- sum.add(checksum);
- }
-
- public void cleanup(Context context)
- throws IOException, InterruptedException {
- context.write(NullWritable.get(), sum);
- }
- }
-
- static class ChecksumReducer
- extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
-
- public void reduce(NullWritable key, Iterable<Unsigned16> values,
- Context context) throws IOException, InterruptedException {
- Unsigned16 sum = new Unsigned16();
- for (Unsigned16 val : values) {
- sum.add(val);
- }
- context.write(key, sum);
- }
- }
-
- private static void usage() throws IOException {
- System.err.println("terasum <out-dir> <report-dir>");
- }
-
- public int run(String[] args) throws Exception {
- Job job = Job.getInstance(getConf());
- if (args.length != 2) {
- usage();
- return 2;
- }
- TeraInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setJobName("TeraSum");
- job.setJarByClass(TeraChecksum.class);
- job.setMapperClass(ChecksumMapper.class);
- job.setReducerClass(ChecksumReducer.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Unsigned16.class);
- // force a single reducer
- job.setNumReduceTasks(1);
- job.setInputFormatClass(TeraInputFormat.class);
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
- System.exit(res);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
deleted file mode 100644
index 70a4207..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.zip.Checksum;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.PureJavaCrc32;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * Generate the official GraySort input data set.
- * The user specifies the number of rows and the output directory and this
- * class runs a map/reduce program to generate the data.
- * The format of the data is:
- * <ul>
- * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid)
- * (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
- * <li>The rowid is the right justified row id as a hex number.
- * </ul>
- *
- * <p>
- * To run the program:
- * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
- */
-public class TeraGen extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(TeraSort.class);
-
- public static enum Counters {CHECKSUM}
-
- public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
- /**
- * An input format that assigns ranges of longs to each mapper.
- */
- static class RangeInputFormat
- extends InputFormat<LongWritable, NullWritable> {
-
- /**
- * An input split consisting of a range on numbers.
- */
- static class RangeInputSplit extends InputSplit implements Writable {
- long firstRow;
- long rowCount;
-
- public RangeInputSplit() { }
-
- public RangeInputSplit(long offset, long length) {
- firstRow = offset;
- rowCount = length;
- }
-
- public long getLength() throws IOException {
- return 0;
- }
-
- public String[] getLocations() throws IOException {
- return new String[]{};
- }
-
- public void readFields(DataInput in) throws IOException {
- firstRow = WritableUtils.readVLong(in);
- rowCount = WritableUtils.readVLong(in);
- }
-
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVLong(out, firstRow);
- WritableUtils.writeVLong(out, rowCount);
- }
- }
-
- /**
- * A record reader that will generate a range of numbers.
- */
- static class RangeRecordReader
- extends RecordReader<LongWritable, NullWritable> {
- long startRow;
- long finishedRows;
- long totalRows;
- LongWritable key = null;
-
- public RangeRecordReader() {
- }
-
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- startRow = ((RangeInputSplit)split).firstRow;
- finishedRows = 0;
- totalRows = ((RangeInputSplit)split).rowCount;
- }
-
- public void close() throws IOException {
- // NOTHING
- }
-
- public LongWritable getCurrentKey() {
- return key;
- }
-
- public NullWritable getCurrentValue() {
- return NullWritable.get();
- }
-
- public float getProgress() throws IOException {
- return finishedRows / (float) totalRows;
- }
-
- public boolean nextKeyValue() {
- if (key == null) {
- key = new LongWritable();
- }
- if (finishedRows < totalRows) {
- key.set(startRow + finishedRows);
- finishedRows += 1;
- return true;
- } else {
- return false;
- }
- }
-
- }
-
- public RecordReader<LongWritable, NullWritable>
- createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- return new RangeRecordReader();
- }
-
- /**
- * Create the desired number of splits, dividing the number of rows
- * between the mappers.
- */
- public List<InputSplit> getSplits(JobContext job) {
- long totalRows = getNumberOfRows(job);
- int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
- LOG.info("Generating " + totalRows + " using " + numSplits);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- long currentRow = 0;
- for(int split = 0; split < numSplits; ++split) {
- long goal =
- (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
- splits.add(new RangeInputSplit(currentRow, goal - currentRow));
- currentRow = goal;
- }
- return splits;
- }
-
- }
-
- static long getNumberOfRows(JobContext job) {
- return job.getConfiguration().getLong(NUM_ROWS, 0);
- }
-
- static void setNumberOfRows(Job job, long numRows) {
- job.getConfiguration().setLong(NUM_ROWS, numRows);
- }
-
- /**
- * The Mapper class that given a row number, will generate the appropriate
- * output line.
- */
- public static class SortGenMapper
- extends Mapper<LongWritable, NullWritable, Text, Text> {
-
- private Text key = new Text();
- private Text value = new Text();
- private Unsigned16 rand = null;
- private Unsigned16 rowId = null;
- private Unsigned16 checksum = new Unsigned16();
- private Checksum crc32 = new PureJavaCrc32();
- private Unsigned16 total = new Unsigned16();
- private static final Unsigned16 ONE = new Unsigned16(1);
- private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
- TeraInputFormat.VALUE_LENGTH];
- private Counter checksumCounter;
-
- public void map(LongWritable row, NullWritable ignored,
- Context context) throws IOException, InterruptedException {
- if (rand == null) {
- rowId = new Unsigned16(row.get());
- rand = Random16.skipAhead(rowId);
- checksumCounter = context.getCounter(Counters.CHECKSUM);
- }
- Random16.nextRand(rand);
- GenSort.generateRecord(buffer, rand, rowId);
- key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
- value.set(buffer, TeraInputFormat.KEY_LENGTH,
- TeraInputFormat.VALUE_LENGTH);
- context.write(key, value);
- crc32.reset();
- crc32.update(buffer, 0,
- TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
- checksum.set(crc32.getValue());
- total.add(checksum);
- rowId.add(ONE);
- }
-
- @Override
- public void cleanup(Context context) {
- if (checksumCounter != null) {
- checksumCounter.increment(total.getLow8());
- }
- }
- }
-
- private static void usage() throws IOException {
- System.err.println("teragen <num rows> <output dir>");
- }
-
- /**
- * Parse a number that optionally has a postfix that denotes a base.
- * @param str an string integer with an option base {k,m,b,t}.
- * @return the expanded value
- */
- private static long parseHumanLong(String str) {
- char tail = str.charAt(str.length() - 1);
- long base = 1;
- switch (tail) {
- case 't':
- base *= 1000 * 1000 * 1000 * 1000;
- break;
- case 'b':
- base *= 1000 * 1000 * 1000;
- break;
- case 'm':
- base *= 1000 * 1000;
- break;
- case 'k':
- base *= 1000;
- break;
- default:
- }
- if (base != 1) {
- str = str.substring(0, str.length() - 1);
- }
- return Long.parseLong(str) * base;
- }
-
- /**
- * @param args the cli arguments
- */
- public int run(String[] args)
- throws IOException, InterruptedException, ClassNotFoundException {
- Job job = Job.getInstance(getConf());
- if (args.length != 2) {
- usage();
- return 2;
- }
- setNumberOfRows(job, parseHumanLong(args[0]));
- Path outputDir = new Path(args[1]);
- if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
- throw new IOException("Output directory " + outputDir +
- " already exists.");
- }
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setJobName("TeraGen");
- job.setJarByClass(TeraGen.class);
- job.setMapperClass(SortGenMapper.class);
- job.setNumReduceTasks(0);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(RangeInputFormat.class);
- job.setOutputFormatClass(TeraOutputFormat.class);
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
- System.exit(res);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
deleted file mode 100644
index 6c9b201..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.QuickSort;
-
-/**
- * An input format that reads the first 10 characters of each line as the key
- * and the rest of the line as the value. Both key and value are represented
- * as Text.
- */
-public class TeraInputFormat extends FileInputFormat<Text,Text> {
-
- static final String PARTITION_FILENAME = "_partition.lst";
- private static final String NUM_PARTITIONS =
- "mapreduce.terasort.num.partitions";
- private static final String SAMPLE_SIZE =
- "mapreduce.terasort.partitions.sample";
- static final int KEY_LENGTH = 10;
- static final int VALUE_LENGTH = 90;
- static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
- private static MRJobConfig lastContext = null;
- private static List<InputSplit> lastResult = null;
-
- static class TeraFileSplit extends FileSplit {
- static private String[] ZERO_LOCATIONS = new String[0];
-
- private String[] locations;
-
- public TeraFileSplit() {
- locations = ZERO_LOCATIONS;
- }
- public TeraFileSplit(Path file, long start, long length, String[] hosts) {
- super(file, start, length, hosts);
- try {
- locations = super.getLocations();
- } catch (IOException e) {
- locations = ZERO_LOCATIONS;
- }
- }
-
- // XXXXXX should this also be null-protected?
- protected void setLocations(String[] hosts) {
- locations = hosts;
- }
-
- @Override
- public String[] getLocations() {
- return locations;
- }
-
- public String toString() {
- StringBuffer result = new StringBuffer();
- result.append(getPath());
- result.append(" from ");
- result.append(getStart());
- result.append(" length ");
- result.append(getLength());
- for(String host: getLocations()) {
- result.append(" ");
- result.append(host);
- }
- return result.toString();
- }
- }
-
- static class TextSampler implements IndexedSortable {
- private ArrayList<Text> records = new ArrayList<Text>();
-
- public int compare(int i, int j) {
- Text left = records.get(i);
- Text right = records.get(j);
- return left.compareTo(right);
- }
-
- public void swap(int i, int j) {
- Text left = records.get(i);
- Text right = records.get(j);
- records.set(j, left);
- records.set(i, right);
- }
-
- public void addKey(Text key) {
- synchronized (this) {
- records.add(new Text(key));
- }
- }
-
- /**
- * Find the split points for a given sample. The sample keys are sorted
- * and down sampled to find even split points for the partitions. The
- * returned keys should be the start of their respective partitions.
- * @param numPartitions the desired number of partitions
- * @return an array of size numPartitions - 1 that holds the split points
- */
- Text[] createPartitions(int numPartitions) {
- int numRecords = records.size();
- System.out.println("Making " + numPartitions + " from " + numRecords +
- " sampled records");
- if (numPartitions > numRecords) {
- throw new IllegalArgumentException
- ("Requested more partitions than input keys (" + numPartitions +
- " > " + numRecords + ")");
- }
- new QuickSort().sort(this, 0, records.size());
- float stepSize = numRecords / (float) numPartitions;
- Text[] result = new Text[numPartitions-1];
- for(int i=1; i < numPartitions; ++i) {
- result[i-1] = records.get(Math.round(stepSize * i));
- }
- return result;
- }
- }
-
- /**
- * Use the input splits to take samples of the input and generate sample
- * keys. By default reads 100,000 keys from 10 locations in the input, sorts
- * them and picks N-1 keys to generate N equally sized partitions.
- * @param job the job to sample
- * @param partFile where to write the output file to
- * @throws Throwable if something goes wrong
- */
- public static void writePartitionFile(final JobContext job,
- Path partFile) throws Throwable {
- long t1 = System.currentTimeMillis();
- Configuration conf = job.getConfiguration();
- final TeraInputFormat inFormat = new TeraInputFormat();
- final TextSampler sampler = new TextSampler();
- int partitions = job.getNumReduceTasks();
- long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
- final List<InputSplit> splits = inFormat.getSplits(job);
- long t2 = System.currentTimeMillis();
- System.out.println("Computing input splits took " + (t2 - t1) + "ms");
- int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
- System.out.println("Sampling " + samples + " splits of " + splits.size());
- final long recordsPerSample = sampleSize / samples;
- final int sampleStep = splits.size() / samples;
- Thread[] samplerReader = new Thread[samples];
- SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
- // take N samples from different parts of the input
- for(int i=0; i < samples; ++i) {
- final int idx = i;
- samplerReader[i] =
- new Thread (threadGroup,"Sampler Reader " + idx) {
- {
- setDaemon(true);
- }
- public void run() {
- long records = 0;
- try {
- TaskAttemptContext context = new TaskAttemptContextImpl(
- job.getConfiguration(), new TaskAttemptID());
- RecordReader<Text, Text> reader =
- inFormat.createRecordReader(splits.get(sampleStep * idx),
- context);
- reader.initialize(splits.get(sampleStep * idx), context);
- while (reader.nextKeyValue()) {
- sampler.addKey(new Text(reader.getCurrentKey()));
- records += 1;
- if (recordsPerSample <= records) {
- break;
- }
- }
- } catch (IOException ie){
- System.err.println("Got an exception while reading splits " +
- ExceptionUtils.getStackTrace(ie));
- throw new RuntimeException(ie);
- } catch (InterruptedException e) {
-
- }
- }
- };
- samplerReader[i].start();
- }
- FileSystem outFs = partFile.getFileSystem(conf);
- DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
- outFs.getDefaultBlockSize(partFile));
- for (int i = 0; i < samples; i++) {
- try {
- samplerReader[i].join();
- if(threadGroup.getThrowable() != null){
- throw threadGroup.getThrowable();
- }
- } catch (InterruptedException e) {
- }
- }
- for(Text split : sampler.createPartitions(partitions)) {
- split.write(writer);
- }
- writer.close();
- long t3 = System.currentTimeMillis();
- System.out.println("Computing parititions took " + (t3 - t2) + "ms");
- }
-
- static class SamplerThreadGroup extends ThreadGroup{
-
- private Throwable throwable;
-
- public SamplerThreadGroup(String s) {
- super(s);
- }
-
- @Override
- public void uncaughtException(Thread thread, Throwable throwable) {
- this.throwable = throwable;
- }
-
- public Throwable getThrowable() {
- return this.throwable;
- }
-
- }
-
- static class TeraRecordReader extends RecordReader<Text,Text> {
- private FSDataInputStream in;
- private long offset;
- private long length;
- private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
- private byte[] buffer = new byte[RECORD_LENGTH];
- private Text key;
- private Text value;
-
- public TeraRecordReader() throws IOException {
- }
-
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- Path p = ((FileSplit)split).getPath();
- FileSystem fs = p.getFileSystem(context.getConfiguration());
- in = fs.open(p);
- long start = ((FileSplit)split).getStart();
- // find the offset to start at a record boundary
- offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
- in.seek(start + offset);
- length = ((FileSplit)split).getLength();
- }
-
- public void close() throws IOException {
- in.close();
- }
-
- public Text getCurrentKey() {
- return key;
- }
-
- public Text getCurrentValue() {
- return value;
- }
-
- public float getProgress() throws IOException {
- return (float) offset / length;
- }
-
- public boolean nextKeyValue() throws IOException {
- if (offset >= length) {
- return false;
- }
- int read = 0;
- while (read < RECORD_LENGTH) {
- long newRead = in.read(buffer, read, RECORD_LENGTH - read);
- if (newRead == -1) {
- if (read == 0) {
- return false;
- } else {
- throw new EOFException("read past eof");
- }
- }
- read += newRead;
- }
- if (key == null) {
- key = new Text();
- }
- if (value == null) {
- value = new Text();
- }
- key.set(buffer, 0, KEY_LENGTH);
- value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
- offset += RECORD_LENGTH;
- return true;
- }
- }
-
- @Override
- public RecordReader<Text, Text>
- createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- return new TeraRecordReader();
- }
-
- protected FileSplit makeSplit(Path file, long start, long length,
- String[] hosts) {
- return new TeraFileSplit(file, start, length, hosts);
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- if (job == lastContext) {
- return lastResult;
- }
- long t1, t2, t3;
- t1 = System.currentTimeMillis();
- lastContext = job;
- lastResult = super.getSplits(job);
- t2 = System.currentTimeMillis();
- System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
- if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
- TeraScheduler scheduler = new TeraScheduler(
- lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
- lastResult = scheduler.getNewFileSplits();
- t3 = System.currentTimeMillis();
- System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
- }
- return lastResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
deleted file mode 100644
index 4ff0f06..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-
-/**
- * An output format that writes the key and value appended together.
- */
-public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
- static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
- private OutputCommitter committer = null;
-
- /**
- * Set the requirement for a final sync before the stream is closed.
- */
- static void setFinalSync(JobContext job, boolean newValue) {
- job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
- }
-
- /**
- * Does the user want a final sync at close?
- */
- public static boolean getFinalSync(JobContext job) {
- return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
- }
-
- static class TeraRecordWriter extends RecordWriter<Text,Text> {
- private boolean finalSync = false;
- private FSDataOutputStream out;
-
- public TeraRecordWriter(FSDataOutputStream out,
- JobContext job) {
- finalSync = getFinalSync(job);
- this.out = out;
- }
-
- public synchronized void write(Text key,
- Text value) throws IOException {
- out.write(key.getBytes(), 0, key.getLength());
- out.write(value.getBytes(), 0, value.getLength());
- }
-
- public void close(TaskAttemptContext context) throws IOException {
- if (finalSync) {
- out.hsync();
- }
- out.close();
- }
- }
-
- @Override
- public void checkOutputSpecs(JobContext job
- ) throws InvalidJobConfException, IOException {
- // Ensure that the output directory is set
- Path outDir = getOutputPath(job);
- if (outDir == null) {
- throw new InvalidJobConfException("Output directory not set in JobConf.");
- }
-
- // get delegation token for outDir's file system
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { outDir }, job.getConfiguration());
- }
-
- public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
- ) throws IOException {
- Path file = getDefaultWorkFile(job, "");
- FileSystem fs = file.getFileSystem(job.getConfiguration());
- FSDataOutputStream fileOut = fs.create(file);
- return new TeraRecordWriter(fileOut, job);
- }
-
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException {
- if (committer == null) {
- Path output = getOutputPath(context);
- committer = new FileOutputCommitter(output, context);
- }
- return committer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8e382b34/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java
deleted file mode 100644
index 82638d8..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.terasort;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.tez.mapreduce.examples.terasort.TeraInputFormat.TeraFileSplit;
-
-import com.google.common.base.Charsets;
-
-class TeraScheduler {
- static String USE = "mapreduce.terasort.use.terascheduler";
- private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
- private Split[] splits;
- private List<Host> hosts = new ArrayList<Host>();
- private int slotsPerHost;
- private int remainingSplits = 0;
- private FileSplit[] realSplits = null;
-
- static class Split {
- String filename;
- boolean isAssigned = false;
- List<Host> locations = new ArrayList<Host>();
- Split(String filename) {
- this.filename = filename;
- }
- public String toString() {
- StringBuffer result = new StringBuffer();
- result.append(filename);
- result.append(" on ");
- for(Host host: locations) {
- result.append(host.hostname);
- result.append(", ");
- }
- return result.toString();
- }
- }
- static class Host {
- String hostname;
- List<Split> splits = new ArrayList<Split>();
- Host(String hostname) {
- this.hostname = hostname;
- }
- public String toString() {
- StringBuffer result = new StringBuffer();
- result.append(splits.size());
- result.append(" ");
- result.append(hostname);
- return result.toString();
- }
- }
-
- List<String> readFile(String filename) throws IOException {
- List<String> result = new ArrayList<String>(10000);
- BufferedReader in = new BufferedReader(
- new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8));
- String line = in.readLine();
- while (line != null) {
- result.add(line);
- line = in.readLine();
- }
- in.close();
- return result;
- }
-
- public TeraScheduler(String splitFilename,
- String nodeFilename) throws IOException {
- slotsPerHost = 4;
- // get the hosts
- Map<String, Host> hostIds = new HashMap<String,Host>();
- for(String hostName: readFile(nodeFilename)) {
- Host host = new Host(hostName);
- hosts.add(host);
- hostIds.put(hostName, host);
- }
- // read the blocks
- List<String> splitLines = readFile(splitFilename);
- splits = new Split[splitLines.size()];
- remainingSplits = 0;
- for(String line: splitLines) {
- StringTokenizer itr = new StringTokenizer(line);
- Split newSplit = new Split(itr.nextToken());
- splits[remainingSplits++] = newSplit;
- while (itr.hasMoreTokens()) {
- Host host = hostIds.get(itr.nextToken());
- newSplit.locations.add(host);
- host.splits.add(newSplit);
- }
- }
- }
-
- public TeraScheduler(FileSplit[] realSplits,
- Configuration conf) throws IOException {
- this.realSplits = realSplits;
- this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
- Map<String, Host> hostTable = new HashMap<String, Host>();
- splits = new Split[realSplits.length];
- for(FileSplit realSplit: realSplits) {
- Split split = new Split(realSplit.getPath().toString());
- splits[remainingSplits++] = split;
- for(String hostname: realSplit.getLocations()) {
- Host host = hostTable.get(hostname);
- if (host == null) {
- host = new Host(hostname);
- hostTable.put(hostname, host);
- hosts.add(host);
- }
- host.splits.add(split);
- split.locations.add(host);
- }
- }
- }
-
- Host pickBestHost() {
- Host result = null;
- int splits = Integer.MAX_VALUE;
- for(Host host: hosts) {
- if (host.splits.size() < splits) {
- result = host;
- splits = host.splits.size();
- }
- }
- if (result != null) {
- hosts.remove(result);
- LOG.debug("picking " + result);
- }
- return result;
- }
-
- void pickBestSplits(Host host) {
- int tasksToPick = Math.min(slotsPerHost,
- (int) Math.ceil((double) remainingSplits /
- hosts.size()));
- Split[] best = new Split[tasksToPick];
- for(Split cur: host.splits) {
- LOG.debug(" examine: " + cur.filename + " " + cur.locations.size());
- int i = 0;
- while (i < tasksToPick && best[i] != null &&
- best[i].locations.size() <= cur.locations.size()) {
- i += 1;
- }
- if (i < tasksToPick) {
- for(int j = tasksToPick - 1; j > i; --j) {
- best[j] = best[j-1];
- }
- best[i] = cur;
- }
- }
- // for the chosen blocks, remove them from the other locations
- for(int i=0; i < tasksToPick; ++i) {
- if (best[i] != null) {
- LOG.debug(" best: " + best[i].filename);
- for (Host other: best[i].locations) {
- other.splits.remove(best[i]);
- }
- best[i].locations.clear();
- best[i].locations.add(host);
- best[i].isAssigned = true;
- remainingSplits -= 1;
- }
- }
- // for the non-chosen blocks, remove this host
- for(Split cur: host.splits) {
- if (!cur.isAssigned) {
- cur.locations.remove(host);
- }
- }
- }
-
- void solve() throws IOException {
- Host host = pickBestHost();
- while (host != null) {
- pickBestSplits(host);
- host = pickBestHost();
- }
- }
-
- /**
- * Solve the schedule and modify the FileSplit array to reflect the new
- * schedule. It will move placed splits to front and unplacable splits
- * to the end.
- * @return a new list of FileSplits that are modified to have the
- * best host as the only host.
- * @throws IOException
- */
- public List<InputSplit> getNewFileSplits() throws IOException {
- solve();
- FileSplit[] result = new FileSplit[realSplits.length];
- int left = 0;
- int right = realSplits.length - 1;
- for(int i=0; i < splits.length; ++i) {
- if (splits[i].isAssigned) {
- // copy the split and fix up the locations
- ((TeraFileSplit) realSplits[i]).setLocations
- (new String[]{splits[i].locations.get(0).hostname});
- result[left++] = realSplits[i];
- } else {
- result[right--] = realSplits[i];
- }
- }
- List<InputSplit> ret = new ArrayList<InputSplit>();
- for (FileSplit fs : result) {
- ret.add(fs);
- }
- return ret;
- }
-
- public static void main(String[] args) throws IOException {
- TeraScheduler problem = new TeraScheduler("block-loc.txt", "nodes");
- for(Host host: problem.hosts) {
- System.out.println(host);
- }
- LOG.info("starting solve");
- problem.solve();
- List<Split> leftOvers = new ArrayList<Split>();
- for(int i=0; i < problem.splits.length; ++i) {
- if (problem.splits[i].isAssigned) {
- System.out.println("sched: " + problem.splits[i]);
- } else {
- leftOvers.add(problem.splits[i]);
- }
- }
- for(Split cur: leftOvers) {
- System.out.println("left: " + cur);
- }
- System.out.println("left over: " + leftOvers.size());
- LOG.info("done");
- }
-
-}