You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [29/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex Thu Apr 18 23:54:18 2013
@@ -0,0 +1,370 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%     http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+\documentclass{article}
+\usepackage[pdftex]{hyperref}
+\usepackage[pdftex]{graphicx}
+
+\title{Winning a 60 Second Dash with a Yellow Elephant}
+\author{\href{http://people.apache.org/~omalley}{Owen O'Malley} and 
+        \href{http://people.apache.org/~acmurthy}{Arun C. Murthy}\\
+\href{http://www.yahoo.com/}{Yahoo!}\\
+owen@yahoo-inc.com and acm@yahoo-inc.com}
+\date{April 2009}
+\begin{document}
+\maketitle
+\href{http://hadoop.apache.org/core}{Apache Hadoop} is a open source
+software framework that dramatically simplifies writing distributed
+data intensive applications. It provides a distributed file system,
+which is modeled after the Google File System\cite{gfs}, and a
+map/reduce\cite{mapreduce} implementation that manages distributed
+computation. Jim Gray defined a benchmark to compare large sorting
+programs. Since the core of map/reduce is a distributed sort, most of
+the custom code is glue to get the desired behavior.
+
+\section{Benchmark Rules}
+
+Jim's Gray's sort benchmark consists of a set of many related
+benchmarks, each with their own rules. All of the sort benchmarks
+measure the time to sort different numbers of 100 byte records. The
+first 10 bytes of each record is the key and the rest is the
+value. The \textbf{minute sort} must finish end to end in less than a
+minute. The \textbf{Gray sort} must sort more than 100 terabytes and
+must run for at least an hour.
+
+\begin{itemize}
+\item The input data must precisely match the data generated by the C
+  data generator.
+\item The input must not be in the operating system's file
+  cache when the job starts.. Under Linux, this requires using the memory for something
+  else between sorting runs.
+\item The input and output data must not be compressed.
+\item The output must not overwrite the input.
+\item The output must be synced to disk.
+\item The 128 bit sum of the crc32's of each key/value pair must be
+  calculated for the input and output. Naturally, they must be
+  identical.
+\item The output may be divided into multiple output files, but it
+  must be totally ordered (simply concatenating the output files must
+  produce the completely sorted output).
+\item Starting and distributing the application to the cluster must be
+  included in the execution time.
+\item Any sampling must be included in the execution time.
+\end{itemize}
+
+\section{Hadoop implementation}
+
+We extended the programs from last year to create and manipulate the
+new binary format and match the new rules. There are now 4 Hadoop
+map/reduce applications to support the benchmark:
+\begin{enumerate}
+\item \textbf{TeraGen} is a map/reduce program to generate the data.
+\item \textbf{TeraSort} samples the input data and uses map/reduce to
+  sort the data into a total order.
+\item \textbf{TeraSum} is a map/reduce program computes the 128 bit
+  sum of the crc32 of each key/value pair.
+\item \textbf{TeraValidate} is a map/reduce program that validates the
+  output is sorted and computes the sum of the checksums as TeraSum.
+\end{enumerate}
+The update to the terasort programs will be checked in as
+\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716}.
+
+\textbf{TeraGen} generates input data for the sort that is byte for byte
+equivalent to the C version that was released in March of 2009,
+including specific keys and values. It divides the desired number of
+rows by the desired number of tasks and assigns ranges of rows to each
+map. The map jumps the random number generator to the correct value
+for the first row and generates the following rows.
+
+\textbf{TeraSort} is a standard map/reduce sort, except for a custom
+partitioner that ensures that all of the keys in reduce $N$ are after
+all of the keys in reduce $N-1$. This is a requirement of the contest
+so that the output of the sort is totally ordered, even if it is
+divided up by reduce.
+
+We wrote an input and output format, used by all 4 applications to
+read and write the files in the new format.
+
+\textbf{TeraSum} computes the 128 bit sum of the CRC32 of each
+key/value pair. Each map computes the sum of its input and emits a
+single 128 bit sum. There is a single reduce that adds the sums from
+each map. We used this program on the input directory to calculate the
+sum of the checksums of each key/value pair to check the correctness
+of the output of the sort. We also used TeraSum on a distinct dataset
+that was larger than the total RAM in the cluster to flush the Linux
+file cache between runs of the small (500 GB and 1TB) sorts.
+
+\textbf{TeraValidate} ensures that the output is globally sorted. It
+creates one map per file in the output directory and each map
+ensures that each key is less than or equal to the previous one. The
+map also generates records with the first and last keys of the file
+and the reduce ensures that the first key of file $i$ is greater that
+the last key of file $i-1$. Any problems are reported as output of the
+reduce with the keys that are out of order. Additionally, TeraValidate
+calculates the sum of checksums of the output directory.
+
+\section{Hardware and Operating System}
+
+We ran our benchmarks on Yahoo's Hammer cluster. Hammer's hardware is
+very similar to the hardware that we used in last year's terabyte
+sort. The hardware and operating system details are:
+
+\begin{itemize}
+\item approximately 3800 nodes (in such a large cluster, nodes are
+  always down)
+\item 2 quad core Xeons @ 2.5ghz per node
+\item 4 SATA disks per node
+\item 8G RAM per node (upgraded to 16GB before the petabyte sort)
+\item 1 gigabit ethernet on each node
+\item 40 nodes per rack
+\item 8 gigabit ethernet uplinks from each rack to the core
+\item Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+\item Sun Java JDK (1.6.0\_05-b13 and 1.6.0\_13-b03) (32 and 64 bit)
+\end{itemize}
+
+We hit a JVM bug in 1.6.0\_05-b13 on the larger sorts (100TB and 1PB)
+and switched over to the later JVM, which resolved the issue. For the
+larger sorts, we used 64 bit JVMs for the Name Node and Job Tracker.
+
+\section{Software and Configuration}
+
+The version of Hadoop we used was a private branch of trunk that was
+started in January 2009, which is after the 0.20 branch was feature
+frozen. We used git to manage our branch and it allowed us to easily
+coordinate our work, track our changes, and resynchronize with the
+current Hadoop trunk.
+
+The changes include:
+
+\begin{enumerate}
+
+\item Updated the terasort example in the Hadoop code base to match
+  the dataset defined by the rule changes in the benchmark from March
+  of 2009.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716})
+
+\item We reimplemented the reducer side of Hadoop's shuffle. The
+  redesign improved the performance of the shuffle and removed
+  bottlenecks and over-throttling. It also made the code more
+  maintainable and understandable by breaking a 3000 line Java file
+  into multiple classes with a clean set of interfaces.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5223}{HADOOP-5223})
+
+\item The new shuffle also fetches multiple map outputs from the same
+  node over each connection rather than one at a time. Fetching
+  multiple map outputs at the same time avoids connection setup costs
+  and also avoids the round trip while the server responds to the request.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-1338}{HADOOP-1338})
+  
+\item Allowed configuring timeouts on the shuffle connections and we
+  shortened them for the small sorts. We observed cases where the
+  connections for the shuffle would hang until the timeout, which made
+  low latency jobs impossibly long.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5789}{HADOOP-5789})
+
+\item Set TCP no-delay and more frequent pings between the Task and
+  the Task Tracker to reduce latency in detecting problems.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5788}{HADOOP-5788})
+
+\item We added some protection code to detect incorrect data being
+  transmitted in the shuffle from causing the reduce to fail. It
+  appears this is either a JVM NIO bug or Jetty bug that likely
+  affects 0.20 and trunk under heavy load.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5783}{HADOOP-5783})
+
+\item We used LZO compression on the map outputs. On the new dataset, LZO
+  compresses down to 45\% of the original size. By comparison, the
+  dataset from last year compresses to 20\% of the original size. Last
+  year, the shuffle would run out of direct buffers if we used
+  compression on the map outputs.
+
+\item We implemented memory to memory merges in the reduce during the
+  shuffle to combine the map outputs in memory before we finish the
+  shuffle, thereby reducing the work needed when the reduce is
+  running.
+
+\item We multi-threaded the sampling code that read the input set to
+  find the partition points between the reduces. We also wrote a
+  simple partitioner that assumes the keys are evenly
+  distributed. Since the new dataset does not require sampling, the
+  simple partitioner produces very even partitions.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-4946}{HADOOP-4946})
+
+\item On the smaller clusters, we configured the system with faster
+  heartbeat cycles from the Task Trackers to the Job Tracker (it
+  defaults to 10 secs / 1000 nodes, but we made it configurable and
+  brought it down to 2 seconds/1000 nodes to provide lower latency)
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5784}{HADOOP-5784})
+
+\item Typically the Job Tracker assigns tasks to Task Trackers on a
+  first come first served basis. This greedy assignment of tasks did
+  not lead to good data locality. However, by taking a global view and
+  placing all of the map tasks at once, the system achieves much better
+  locality. Rather than implement global scheduling for all of Hadoop,
+  which would be much harder, we implemented a global scheduler for
+  the terasort example in the input format. Basically, the input
+  format computes the splits and assigns work to the nodes that have
+  the fewest blocks first. For a node that has more blocks
+  than map slots, it picks the block that have the fewest remaining
+  locations left. This greedy global algorithm seems to get very good
+  locality. The input format would schedule the maps and then change
+  the input split descriptions to only have a single location instead
+  of the original 3. This increased task locality by 40\% or so over
+  the greedy scheduler.
+
+\item Hadoop 0.20 added setup and cleanup tasks. Since they are not
+  required for the sort benchmarks, we allow them to be disabled to
+  reduce the latency of starting and stopping the job.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5785}{HADOOP-5785})
+
+\item We discovered a performance problem where in some contexts the
+  cost of using the JNI-based CRC32 was very high. By implementing it
+  in pure Java, the average case is a little slower, but the worst
+  case is much better.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5598}{HADOOP-5598})
+
+\item We found and removed some hard-coded wait loops from the
+  framework that don't matter for large jobs, but can seriously slow
+  down low latency jobs.
+
+\item Allowed setting the logging level for the tasks, so that we
+  could cut down on logging. When running for "real" we configure the
+  logging level to WARN instead of the default INFO. Reducing the
+  amount of logging has a huge impact on the performance of the
+  system, but obviously makes debugging and analysis much harder.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5786}{HADOOP-5786})
+
+\item One optimization that we didn't finish is to optimize the job
+  planning code. Currently, it uses an RPC to the Name Node for each
+  input file, which we have observed taking a substantial amount of
+  time. For the terabyte sort, our investigations show that we
+  could save about 4 seconds out of the 8 that were spent on setting
+  up the job.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5795}{HADOOP-5795})
+
+\end{enumerate}
+
+\section{Results}
+
+Hadoop has made a lot of progress in the last year and we were able to
+run much lower latency jobs as well as much larger jobs. Note that in
+any large cluster and distributed application, there are a lot of
+moving pieces and thus we have seen a wide variation in execution
+times. As Hadoop evolves and becomes more graceful in the presence of
+hardware degradation and failure, this variation should smooth
+out. The best times for each of the listed sort sizes were:
+\\
+
+\begin{tabular}{| c | c | c | c | c | c |}
+\hline
+Bytes & Nodes & Maps & Reduces & Replication & Time \\
+\hline
+$5*10^{11}$ & 1406 & 8000 & 2600 & 1 & 59 seconds \\
+$10^{12}$ & 1460 & 8000 & 2700 & 1 & 62 seconds \\
+$10^{14}$ & 3452 & 190,000 & 10,000 & 2 & 173 minutes \\
+$10^{15}$ & 3658 & 80,000 & 20,000 & 2 & 975 minutes \\
+\hline
+\end{tabular}\\
+
+Within the rules for the 2009 Gray sort, our 500 GB sort set a new
+record for the minute sort and the 1PB sort set a new record of 1.03
+TB/minute. The 62 second terabyte sort would have set a new record,
+but the terabyte benchmark that we won last year has been
+retired. (Clearly the minute sort and terabyte sort are rapidly
+converging, and thus it is not a loss.)  One piece of trivia is that
+only the petabyte dataset had any duplicate keys (40 of them).
+
+Because the smaller sorts needed lower latency and faster network, we
+only used part of the cluster for those runs. In particular, instead
+of our normal 5:1 over subscription between racks, we limited it to 16
+nodes in each rack for a 2:1 over subscription. The smaller runs can
+also use output replication of 1, because they only take minutes to
+run and run on smaller clusters, the likelihood of a node failing is
+fairly low. On the larger runs, failure is expected and thus
+replication of 2 is required. HDFS protects against data loss during
+rack failure by writing the second replica on a different rack and
+thus writing the second replica is relatively slow.
+
+We've included the timelines for the jobs counting from the job
+submission at the Job Tracker. The diagrams show the number of tasks
+running at each point in time. While maps only have a single phase,
+the reduces have three: \textbf{shuffle}, \textbf{merge}, and
+\textbf{reduce}. The shuffle is the transfer of the data from the
+maps. Merge doesn't happen in these benchmarks, because none of the
+reduces need multiple levels of merges. Finally, the reduce phase is
+where the final merge and writing to HDFS happens. I've also included
+a category named \textbf{waste} that represents task attempts that
+were running, but ended up either failing, or being killed (often as
+speculatively executed task attempts). The job logs and configuration
+for the four runs, which are the raw data for the charts, are
+available on
+\href{http://people.apache.org/~omalley/tera-2009/}{http://people.apache.org/~omalley/tera-2009/}.
+
+If you compare this years charts to last year's, you'll notice that
+tasks are launching much faster now. Last year we only launched one
+task per heartbeat, so it took 40 seconds to get all of the tasks
+launched. Now, Hadoop will fill up a Task Tracker in a single
+heartbeat. Reducing that job launch overhead is very important
+for getting runs under a minute.
+
+As with last year, we ran with significantly larger tasks than the
+defaults for Hadoop. Even with the new more aggressive shuffle,
+minimizing the number of transfers (maps * reduces) is very important
+to the performance of the job. Notice that in the petabyte sort, each
+map is processing 15 GB instead of the default 128 MB and each reduce
+is handling 50 GB. When we ran the petabyte with more typical values
+1.5 GB / map, it took 40 hours to finish. Therefore, to increase
+throughput, it makes sense to consider increasing the default block
+size, which translates into the default map size, to at least up to 1
+GB.
+
+\section{Comments on the Rule Changes}
+
+The group that runs the Gray Sort Benchmark made very substantial
+changes to the rules this year. The changes were not announced; but
+rather appeared on the website in March. We feel that it was too late
+to make rule changes and that the benchmark should have been changed
+next year. We'd also like to point out that while most of the changes to
+the data generator were positive, it was a poor choice to remove the
+skewed distribution of the keys. The previously skewed distribution
+required sampling of the input to pick good partition points between
+the reduces. The current dataset picks keys so completely random that
+sampling is counter productive and yields even less distributions between the
+reduces.
+
+\bibliographystyle{abbrv}
+\bibliography{tera}
+
+\begin{figure}[!p]
+\includegraphics[width=4.21in]{500GBTaskTime.png}
+\caption{500 GB sort tasks across time}\label{500GbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1TBTaskTime.png}
+\caption{1 TB sort tasks across time}\label{1TbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{100TBTaskTime.png}
+\caption{100 TB sort tasks across time}\label{100TbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1PBTaskTime.png}
+\caption{1 PB sort tasks across time}\label{1PbTimeline}
+\end{figure} 
+
+\end{document}

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib Thu Apr 18 23:54:18 2013
@@ -0,0 +1,31 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%     http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+@INPROCEEDINGS{mapreduce,
+	AUTHOR = "Jeffery Dean and Sanjay Ghemawat",
+	TITLE = "MapReduce: Simplified Data Processing on Large Clusters",
+	BOOKTITLE = "Sixth Symposium on Operating System Design and Implementation",
+	MONTH = "December", 
+        ADDRESS = "San Francisco, CA",
+	YEAR = {2004}	}
+
+@INPROCEEDINGS{gfs,
+	AUTHOR = "Sanjay Ghemawat and Howard Gobioff and Shun-Tak Leung",
+	TITLE = "The Google File System",
+	BOOKTITLE = "19th Symposium on Operating Systems Principles",
+        ORGANIZATION = "ACM",
+	MONTH = "October", 
+        ADDRESS = "Lake George, NY",
+	YEAR = {2003}	}

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+
+/** 
+ * A single process data generator for the terasort data. Based on gensort.c 
+ * version 1.1 (3 Mar 2009) from Chris Nyberg <ch...@ordinal.com>.
+ */
+public class GenSort {
+
+  /**
+   * Generate a "binary" record suitable for all sort benchmarks *except* 
+   * PennySort.
+   */
+  static void generateRecord(byte[] recBuf, Unsigned16 rand, 
+                                     Unsigned16 recordNumber) {
+    /* generate the 10-byte key using the high 10 bytes of the 128-bit
+     * random number
+     */
+    for(int i=0; i < 10; ++i) {
+      recBuf[i] = rand.getByte(i);
+    }
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = 0x00;
+    recBuf[11] = 0x11;
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[44] = (byte) 0x88;
+    recBuf[45] = (byte) 0x99;
+    recBuf[46] = (byte) 0xAA;
+    recBuf[47] = (byte) 0xBB;
+
+    /* add 48 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 12; ++i) {
+      recBuf[48+i*4] = recBuf[49+i*4] = recBuf[50+i*4] = recBuf[51+i*4] =
+        (byte) rand.getHexDigit(20 + i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[96] = (byte) 0xCC;
+    recBuf[97] = (byte) 0xDD;
+    recBuf[98] = (byte) 0xEE;
+    recBuf[99] = (byte) 0xFF;
+  }
+
+
+  private static BigInteger makeBigInteger(long x) {
+    byte[] data = new byte[8];
+    for(int i=0; i < 8; ++i) {
+      data[i] = (byte) (x >>> (56 - 8*i));
+    }
+    return new BigInteger(1, data);
+  }
+
+  private static final BigInteger NINETY_FIVE = new BigInteger("95");
+
+  /**
+   * Generate an ascii record suitable for all sort benchmarks including 
+   * PennySort.
+   */
+  static void generateAsciiRecord(byte[] recBuf, Unsigned16 rand, 
+                                  Unsigned16 recordNumber) {
+
+    /* generate the 10-byte ascii key using mostly the high 64 bits.
+     */
+    long temp = rand.getHigh8();
+    if (temp < 0) {
+      // use biginteger to avoid the negative sign problem
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[0] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();
+    } else {
+      recBuf[0] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    for(int i=1; i < 8; ++i) {
+      recBuf[i] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    temp = rand.getLow8();
+    if (temp < 0) {
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[8] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();      
+    } else {
+      recBuf[8] = (byte) (' ' + (temp % 95));
+      temp /= 95;
+    }
+    recBuf[9] = (byte)(' ' + (temp % 95));
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = ' ';
+    recBuf[11] = ' ';
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[44] = ' ';
+    recBuf[45] = ' ';
+
+    /* add 52 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 13; ++i) {
+      recBuf[46+i*4] = recBuf[47+i*4] = recBuf[48+i*4] = recBuf[49+i*4] =
+        (byte) rand.getHexDigit(19 + i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[98] = '\r';	/* nice for Windows */
+    recBuf[99] = '\n';
+}
+
+
+  private static void usage() {
+    PrintStream out = System.out;
+    out.println("usage: gensort [-a] [-c] [-bSTARTING_REC_NUM] NUM_RECS FILE_NAME");
+    out.println("-a        Generate ascii records required for PennySort or JouleSort.");
+    out.println("          These records are also an alternative input for the other");
+    out.println("          sort benchmarks.  Without this flag, binary records will be");
+    out.println("          generated that contain the highest density of randomness in");
+    out.println("          the 10-byte key.");
+    out.println( "-c        Calculate the sum of the crc32 checksums of each of the");
+    out.println("          generated records and send it to standard error.");
+    out.println("-bN       Set the beginning record generated to N. By default the");
+    out.println("          first record generated is record 0.");
+    out.println("NUM_RECS  The number of sequential records to generate.");
+    out.println("FILE_NAME The name of the file to write the records to.\n");
+    out.println("Example 1 - to generate 1000000 ascii records starting at record 0 to");
+    out.println("the file named \"pennyinput\":");
+    out.println("    gensort -a 1000000 pennyinput\n");
+    out.println("Example 2 - to generate 1000 binary records beginning with record 2000");
+    out.println("to the file named \"partition2\":");
+    out.println("    gensort -b2000 1000 partition2");
+    System.exit(1);
+  }
+
+
+  public static void outputRecords(OutputStream out,
+                                   boolean useAscii,
+                                   Unsigned16 firstRecordNumber,
+                                   Unsigned16 recordsToGenerate,
+                                   Unsigned16 checksum
+                                   ) throws IOException {
+    byte[] row = new byte[100];
+    Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
+    Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
+    Checksum crc = new PureJavaCrc32();
+    Unsigned16 tmp = new Unsigned16();
+    lastRecordNumber.add(recordsToGenerate);
+    Unsigned16 ONE = new Unsigned16(1);
+    Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
+    while (!recordNumber.equals(lastRecordNumber)) {
+      Random16.nextRand(rand);
+      if (useAscii) {
+        generateAsciiRecord(row, rand, recordNumber);
+      } else {
+        generateRecord(row, rand, recordNumber);
+      }
+      if (checksum != null) {
+        crc.reset();
+        crc.update(row, 0, row.length);
+        tmp.set(crc.getValue());
+        checksum.add(tmp);
+      }
+      recordNumber.add(ONE);
+      out.write(row);
+    }
+  }
+                                   
+  public static void main(String[] args) throws Exception {
+    Unsigned16 startingRecord = new Unsigned16();
+    Unsigned16 numberOfRecords;
+    OutputStream out;
+    boolean useAscii = false;
+    Unsigned16 checksum = null;
+
+    int i;
+    for(i=0; i < args.length; ++i) {
+      String arg = args[i];
+      int argLength = arg.length();
+      if (argLength >= 1 && arg.charAt(0) == '-') {
+        if (argLength < 2) {
+          usage();
+        }
+        switch (arg.charAt(1)) {
+        case 'a':
+          useAscii = true;
+          break;
+        case 'b':
+          startingRecord = Unsigned16.fromDecimal(arg.substring(2));
+          break;
+        case 'c':
+          checksum = new Unsigned16();
+          break;
+        default:
+          usage();
+        }
+      } else {
+        break;
+      }
+    }
+    if (args.length - i != 2) {
+      usage();
+    }
+    numberOfRecords = Unsigned16.fromDecimal(args[i]);
+    out = new FileOutputStream(args[i+1]);
+
+    outputRecords(out, useAscii, startingRecord, numberOfRecords, checksum);
+    out.close();
+    if (checksum != null) {
+      System.out.println(checksum);
+    }
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ *            or 0x2360ed051fc65da44385df649fccf645
+ *   and c is 98910279301475397889117759788405497857
+ *            or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg 
+ * <ch...@ordinal.com>.
+ */
+class Random16 {
+
+  /** 
+   * The "Gen" array contain powers of 2 of the linear congruential generator.
+   * The index 0 struct contain the "a" coefficient and "c" constant for the
+   * generator.  That is, the generator is:
+   *    f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+   *
+   * All structs after the first contain an "a" and "c" that
+   * comprise the square of the previous function.
+   *
+   * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+   * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+   * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+   * ...
+
+   */
+  private static class RandomConstant {
+    final Unsigned16 a;
+    final Unsigned16 c;
+    public RandomConstant(String left, String right) {
+      a = new Unsigned16(left);
+      c = new Unsigned16(right);
+    }
+  }
+
+  private static final RandomConstant[] genArray = new RandomConstant[]{
+    /* [  0] */ new RandomConstant("2360ed051fc65da44385df649fccf645", 
+                                   "4a696d47726179524950202020202001"),
+    /* [  1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99", 
+                                   "95e0e48262b3edfe04479485c755b646"),
+    /* [  2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771", 
+                                   "882a02c315362b60765f100068b33a1c"),
+    /* [  3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1", 
+                                   "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+    /* [  4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1", 
+                                   "f25bd15439d16af594c1b1bafa6239f0"),
+    /* [  5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781", 
+                                   "89ca67c29c9397d59c612596145db7e0"),
+    /* [  6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01", 
+                                   "8b6ae036713bd578a8093c8eae5c7fc0"),
+    /* [  7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01", 
+                                   "98a2542fd23d0dbdff3b886cdb1d3f80"),
+    /* [  8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01", 
+                                   "954db923fdb7933e947cd1edcecb7f00"),
+    /* [  9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801", 
+                                   "00be4a36657c98cd204e8c8af7dafe00"),
+    /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001", 
+                                   "991965329dccb28d581199ab18c5fc00"),
+    /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001", 
+                                   "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+    /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001", 
+                                   "2b657bbfd6ed9d632079e70c3c97f000"),
+    /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+                                   "59b60ee4c52fa49e9fe90682bd2fe000"),
+    /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001", 
+                                   "cc099c88030679464fe86aae8a5fc000"),
+    /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001", 
+                                   "06b9abff9f9f33dd30362c0154bf8000"),
+    /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001", 
+                                   "e296707121688d5a0260b293a97f0000"),
+    /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001", 
+                                   "189ffc4701ff23cb8f8acf6b52fe0000"),
+    /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001", 
+                                   "5141110ab208fb9d61fb47e6a5fc0000"),
+    /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001", 
+                                   "3c97caa62540f2948d8d340d4bf80000"),
+    /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001", 
+                                   "1b25cb9cfe5a0c963174f91a97f00000"),
+    /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001", 
+                                   "0c644570b4a487103c5436352fe00000"),
+    /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001", 
+                                   "3d0589c28869472bde517c6a5fc00000"),
+    /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001", 
+                                   "bc95e5ab36477e65534738d4bf800000"),
+    /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001", 
+                                   "ddb02ff72a031c01011f71a97f000000"),
+    /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001", 
+                                   "2561426086d9acdb6c82e352fe000000"),
+    /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001", 
+                                   "64a788e3c118ed1c8215c6a5fc000000"),
+    /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001", 
+                                   "e65ea321908627cfa86b8d4bf8000000"),
+    /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001", 
+                                   "53d27225604d85f9e1d71a97f0000000"),
+    /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001", 
+                                   "ca5ec7a3ed1fe55e07ae352fe0000000"),
+    /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001", 
+                                   "4daebb2e085330651f5c6a5fc0000000"),
+    /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001", 
+                                   "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+    /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001", 
+                                   "158c62f2b31e496dfd71a97f00000000"),
+    /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001", 
+                                   "290e84a2eb15fd1ffae352fe00000000"),
+    /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001", 
+                                   "e3dc1bfbe991a34ff5c6a5fc00000000"),
+    /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001", 
+                                   "ddf540d020b9eadfeb8d4bf800000000"),
+    /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001", 
+                                   "8ee4950177ce66bfd71a97f000000000"),
+    /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001", 
+                                   "39e0f787c907117fae352fe000000000"),
+    /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001", 
+                                   "659d2522f7b732ff5c6a5fc000000000"),
+    /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001", 
+                                   "9e8722938612a5feb8d4bf8000000000"),
+    /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001", 
+                                   "e941a65d66b64bfd71a97f0000000000"),
+    /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001", 
+                                   "7b50d19437b097fae352fe0000000000"),
+    /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001", 
+                                   "59d7b68e18712ff5c6a5fc0000000000"),
+    /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001", 
+                                   "4087bab2d5225feb8d4bf80000000000"),
+    /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001", 
+                                   "b470abc03b44bfd71a97f00000000000"),
+    /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001", 
+                                   "366630eaba897fae352fe00000000000"),
+    /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001", 
+                                   "a2dfc77e8512ff5c6a5fc00000000000"),
+    /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001", 
+                                   "1e0d25a14a25feb8d4bf800000000000"),
+    /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001", 
+                                   "9d50a5d3944bfd71a97f000000000000"),
+    /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001", 
+                                   "bf7ab5eb2897fae352fe000000000000"),
+    /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001", 
+                                   "925b14e6512ff5c6a5fc000000000000"),
+    /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001", 
+                                   "724cce0ca25feb8d4bf8000000000000"),
+    /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001", 
+                                   "1af42d1944bfd71a97f0000000000000"),
+    /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001", 
+                                   "0f529e32897fae352fe0000000000000"),
+    /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001", 
+                                   "844e4c6512ff5c6a5fc0000000000000"),
+    /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+                                   "9f40d8ca25feb8d4bf80000000000000"),
+    /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+                                   "9912b1944bfd71a97f00000000000000"),
+    /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+                                   "9c69632897fae352fe00000000000000"),
+    /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+                                   "e1e2c6512ff5c6a5fc00000000000000"),
+    /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+                                   "68058ca25feb8d4bf800000000000000"),
+    /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+                                   "610b1944bfd71a97f000000000000000"),
+    /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+                                   "061632897fae352fe000000000000000"),
+    /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+                                   "1c2c6512ff5c6a5fc000000000000000"),
+    /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+                                   "7858ca25feb8d4bf8000000000000000"),
+    /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+                                   "f0b1944bfd71a97f0000000000000000"),
+    /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+                                   "e1632897fae352fe0000000000000000"),
+    /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+                                   "c2c6512ff5c6a5fc0000000000000000"),
+    /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+                                   "858ca25feb8d4bf80000000000000000"),
+    /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+                                   "0b1944bfd71a97f00000000000000000"),
+    /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+                                   "1632897fae352fe00000000000000000"),
+    /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+                                   "2c6512ff5c6a5fc00000000000000000"),
+    /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+                                   "58ca25feb8d4bf800000000000000000"),
+    /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+                                   "b1944bfd71a97f000000000000000000"),
+    /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+                                   "632897fae352fe000000000000000000"),
+    /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+                                   "c6512ff5c6a5fc000000000000000000"),
+    /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+                                   "8ca25feb8d4bf8000000000000000000"),
+    /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+                                   "1944bfd71a97f0000000000000000000"),
+    /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+                                   "32897fae352fe0000000000000000000"),
+    /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+                                   "6512ff5c6a5fc0000000000000000000"),
+    /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+                                   "ca25feb8d4bf80000000000000000000"),
+    /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+                                   "944bfd71a97f00000000000000000000"),
+    /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+                                   "2897fae352fe00000000000000000000"),
+    /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+                                   "512ff5c6a5fc00000000000000000000"),
+    /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+                                   "a25feb8d4bf800000000000000000000"),
+    /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+                                   "44bfd71a97f000000000000000000000"),
+    /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+                                   "897fae352fe000000000000000000000"),
+    /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+                                   "12ff5c6a5fc000000000000000000000"),
+    /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+                                   "25feb8d4bf8000000000000000000000"),
+    /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+                                   "4bfd71a97f0000000000000000000000"),
+    /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+                                   "97fae352fe0000000000000000000000"),
+    /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+                                   "2ff5c6a5fc0000000000000000000000"),
+    /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+                                   "5feb8d4bf80000000000000000000000"),
+    /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+                                   "bfd71a97f00000000000000000000000"),
+    /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+                                   "7fae352fe00000000000000000000000"),
+    /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+                                   "ff5c6a5fc00000000000000000000000"),
+    /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+                                   "feb8d4bf800000000000000000000000"),
+    /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+                                   "fd71a97f000000000000000000000000"),
+    /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+                                   "fae352fe000000000000000000000000"),
+    /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+                                   "f5c6a5fc000000000000000000000000"),
+    /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+                                   "eb8d4bf8000000000000000000000000"),
+    /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+                                   "d71a97f0000000000000000000000000"),
+    /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+                                   "ae352fe0000000000000000000000000"),
+    /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+                                   "5c6a5fc0000000000000000000000000"),
+    /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+                                   "b8d4bf80000000000000000000000000"),
+    /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+                                   "71a97f00000000000000000000000000"),
+    /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+                                   "e352fe00000000000000000000000000"),
+    /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+                                   "c6a5fc00000000000000000000000000"),
+    /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+                                   "8d4bf800000000000000000000000000"),
+    /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+                                   "1a97f000000000000000000000000000"),
+    /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+                                   "352fe000000000000000000000000000"),
+    /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+                                   "6a5fc000000000000000000000000000"),
+    /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+                                   "d4bf8000000000000000000000000000"),
+    /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+                                   "a97f0000000000000000000000000000"),
+    /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+                                   "52fe0000000000000000000000000000"),
+    /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+                                   "a5fc0000000000000000000000000000"),
+    /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+                                   "4bf80000000000000000000000000000"),
+    /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+                                   "97f00000000000000000000000000000"),
+    /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+                                   "2fe00000000000000000000000000000"),
+    /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+                                   "5fc00000000000000000000000000000"),
+    /* [119] */ new RandomConstant("de000000000000000000000000000001",
+                                   "bf800000000000000000000000000000"),
+    /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+                                   "7f000000000000000000000000000000"),
+    /* [121] */ new RandomConstant("78000000000000000000000000000001",
+                                   "fe000000000000000000000000000000"),
+    /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+                                   "fc000000000000000000000000000000"),
+    /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+                                   "f8000000000000000000000000000000"),
+    /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+                                   "f0000000000000000000000000000000"),
+    /* [125] */ new RandomConstant("80000000000000000000000000000001",
+                                   "e0000000000000000000000000000000"),
+    /* [126] */ new RandomConstant("00000000000000000000000000000001",
+                                   "c0000000000000000000000000000000"),
+    /* [127] */ new RandomConstant("00000000000000000000000000000001",
+                                   "80000000000000000000000000000000")};
+
+  /**
+   *  generate the random number that is "advance" steps
+   *  from an initial random number of 0.  This is done by
+   *  starting with 0, and then advancing the by the
+   *  appropriate powers of 2 of the linear congruential
+   *  generator.
+   */
+  public static Unsigned16 skipAhead(Unsigned16 advance) {
+    Unsigned16 result = new Unsigned16();
+    long          bit_map;
+
+    bit_map = advance.getLow8();
+    for (int i = 0; bit_map != 0 && i < 64; i++) {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**i) (x)
+         */
+        result.multiply(genArray[i].a);
+        result.add(genArray[i].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    bit_map = advance.getHigh8();
+    for (int i = 0; bit_map != 0 && i < 64; i++)
+    {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**(i + 64)) (x)
+         */
+        result.multiply(genArray[i+64].a);
+        result.add(genArray[i+64].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    return result;
+  }
+
+  /** 
+   * Generate the next 16 byte random number.
+   */
+  public static void nextRand(Unsigned16 rand) {
+    /* advance the random number forward once using the linear congruential
+     * generator, and then return the new random number
+     */
+    rand.multiply(genArray[0].a);
+    rand.add(genArray[0].c);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TeraChecksum extends Configured implements Tool {
+  static class ChecksumMapper 
+      extends Mapper<Text, Text, NullWritable, Unsigned16> {
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 sum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
+    public void map(Text key, Text value, 
+                    Context context) throws IOException {
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      checksum.set(crc32.getValue());
+      sum.add(checksum);
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      context.write(NullWritable.get(), sum);
+    }
+  }
+
+  static class ChecksumReducer 
+      extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
+
+    public void reduce(NullWritable key, Iterable<Unsigned16> values,
+        Context context) throws IOException, InterruptedException  {
+      Unsigned16 sum = new Unsigned16();
+      for (Unsigned16 val : values) {
+        sum.add(val);
+      }
+      context.write(key, sum);
+    }
+  }
+
+  private static void usage() throws IOException {
+    System.err.println("terasum <out-dir> <report-dir>");
+  }
+
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    if (args.length != 2) {
+      usage();
+      return 2;
+    }
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraSum");
+    job.setJarByClass(TeraChecksum.class);
+    job.setMapperClass(ChecksumMapper.class);
+    job.setReducerClass(ChecksumReducer.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Unsigned16.class);
+    // force a single reducer
+    job.setNumReduceTasks(1);
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
+    System.exit(res);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate the official GraySort input data set.
+ * The user specifies the number of rows and the output directory and this
+ * class runs a map/reduce program to generate the data.
+ * The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid) 
+ *     (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
+ * <li>The rowid is the right justified row id as a hex number.
+ * </ul>
+ *
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
+ */
+public class TeraGen extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(TeraSort.class);
+
+  public static enum Counters {CHECKSUM}
+
+  public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
+  /**
+   * An input format that assigns ranges of longs to each mapper.
+   */
+  static class RangeInputFormat 
+      extends InputFormat<LongWritable, NullWritable> {
+    
+    /**
+     * An input split consisting of a range on numbers.
+     */
+    static class RangeInputSplit extends InputSplit implements Writable {
+      long firstRow;
+      long rowCount;
+
+      public RangeInputSplit() { }
+
+      public RangeInputSplit(long offset, long length) {
+        firstRow = offset;
+        rowCount = length;
+      }
+
+      public long getLength() throws IOException {
+        return 0;
+      }
+
+      public String[] getLocations() throws IOException {
+        return new String[]{};
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        firstRow = WritableUtils.readVLong(in);
+        rowCount = WritableUtils.readVLong(in);
+      }
+
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVLong(out, firstRow);
+        WritableUtils.writeVLong(out, rowCount);
+      }
+    }
+    
+    /**
+     * A record reader that will generate a range of numbers.
+     */
+    static class RangeRecordReader 
+        extends RecordReader<LongWritable, NullWritable> {
+      long startRow;
+      long finishedRows;
+      long totalRows;
+      LongWritable key = null;
+
+      public RangeRecordReader() {
+      }
+      
+      public void initialize(InputSplit split, TaskAttemptContext context) 
+          throws IOException, InterruptedException {
+        startRow = ((RangeInputSplit)split).firstRow;
+        finishedRows = 0;
+        totalRows = ((RangeInputSplit)split).rowCount;
+      }
+
+      public void close() throws IOException {
+        // NOTHING
+      }
+
+      public LongWritable getCurrentKey() {
+        return key;
+      }
+
+      public NullWritable getCurrentValue() {
+        return NullWritable.get();
+      }
+
+      public float getProgress() throws IOException {
+        return finishedRows / (float) totalRows;
+      }
+
+      public boolean nextKeyValue() {
+        if (key == null) {
+          key = new LongWritable();
+        }
+        if (finishedRows < totalRows) {
+          key.set(startRow + finishedRows);
+          finishedRows += 1;
+          return true;
+        } else {
+          return false;
+        }
+      }
+      
+    }
+
+    public RecordReader<LongWritable, NullWritable> 
+        createRecordReader(InputSplit split, TaskAttemptContext context) 
+        throws IOException {
+      return new RangeRecordReader();
+    }
+
+    /**
+     * Create the desired number of splits, dividing the number of rows
+     * between the mappers.
+     */
+    public List<InputSplit> getSplits(JobContext job) {
+      long totalRows = getNumberOfRows(job);
+      int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+      LOG.info("Generating " + totalRows + " using " + numSplits);
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      long currentRow = 0;
+      for(int split = 0; split < numSplits; ++split) {
+        long goal = 
+          (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
+        splits.add(new RangeInputSplit(currentRow, goal - currentRow));
+        currentRow = goal;
+      }
+      return splits;
+    }
+
+  }
+  
+  static long getNumberOfRows(JobContext job) {
+    return job.getConfiguration().getLong(NUM_ROWS, 0);
+  }
+  
+  static void setNumberOfRows(Job job, long numRows) {
+    job.getConfiguration().setLong(NUM_ROWS, numRows);
+  }
+
+  /**
+   * The Mapper class that given a row number, will generate the appropriate 
+   * output line.
+   */
+  public static class SortGenMapper 
+      extends Mapper<LongWritable, NullWritable, Text, Text> {
+
+    private Text key = new Text();
+    private Text value = new Text();
+    private Unsigned16 rand = null;
+    private Unsigned16 rowId = null;
+    private Unsigned16 checksum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+    private Unsigned16 total = new Unsigned16();
+    private static final Unsigned16 ONE = new Unsigned16(1);
+    private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
+                                     TeraInputFormat.VALUE_LENGTH];
+    private Counter checksumCounter;
+
+    public void map(LongWritable row, NullWritable ignored,
+        Context context) throws IOException, InterruptedException {
+      if (rand == null) {
+        rowId = new Unsigned16(row.get());
+        rand = Random16.skipAhead(rowId);
+        checksumCounter = context.getCounter(Counters.CHECKSUM);
+      }
+      Random16.nextRand(rand);
+      GenSort.generateRecord(buffer, rand, rowId);
+      key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
+      value.set(buffer, TeraInputFormat.KEY_LENGTH, 
+                TeraInputFormat.VALUE_LENGTH);
+      context.write(key, value);
+      crc32.reset();
+      crc32.update(buffer, 0, 
+                   TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
+      checksum.set(crc32.getValue());
+      total.add(checksum);
+      rowId.add(ONE);
+    }
+
+    @Override
+    public void cleanup(Context context) {
+      if (checksumCounter != null) {
+        checksumCounter.increment(total.getLow8());
+      }
+    }
+  }
+
+  private static void usage() throws IOException {
+    System.err.println("teragen <num rows> <output dir>");
+  }
+
+  /**
+   * Parse a number that optionally has a postfix that denotes a base.
+   * @param str an string integer with an option base {k,m,b,t}.
+   * @return the expanded value
+   */
+  private static long parseHumanLong(String str) {
+    char tail = str.charAt(str.length() - 1);
+    long base = 1;
+    switch (tail) {
+    case 't':
+      base *= 1000 * 1000 * 1000 * 1000;
+      break;
+    case 'b':
+      base *= 1000 * 1000 * 1000;
+      break;
+    case 'm':
+      base *= 1000 * 1000;
+      break;
+    case 'k':
+      base *= 1000;
+      break;
+    default:
+    }
+    if (base != 1) {
+      str = str.substring(0, str.length() - 1);
+    }
+    return Long.parseLong(str) * base;
+  }
+  
+  /**
+   * @param args the cli arguments
+   */
+  public int run(String[] args) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = Job.getInstance(getConf());
+    if (args.length != 2) {
+      usage();
+      return 2;
+    }
+    setNumberOfRows(job, parseHumanLong(args[0]));
+    Path outputDir = new Path(args[1]);
+    if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
+      throw new IOException("Output directory " + outputDir + 
+                            " already exists.");
+    }
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setJobName("TeraGen");
+    job.setJarByClass(TeraGen.class);
+    job.setMapperClass(SortGenMapper.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setInputFormatClass(RangeInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
+    System.exit(res);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * An input format that reads the first 10 characters of each line as the key
+ * and the rest of the line as the value. Both key and value are represented
+ * as Text.
+ */
+public class TeraInputFormat extends FileInputFormat<Text,Text> {
+
+  static final String PARTITION_FILENAME = "_partition.lst";
+  private static final String NUM_PARTITIONS = 
+    "mapreduce.terasort.num.partitions";
+  private static final String SAMPLE_SIZE = 
+    "mapreduce.terasort.partitions.sample";
+  static final int KEY_LENGTH = 10;
+  static final int VALUE_LENGTH = 90;
+  static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+  private static MRJobConfig lastContext = null;
+  private static List<InputSplit> lastResult = null;
+
+  static class TeraFileSplit extends FileSplit {
+    static private String[] ZERO_LOCATIONS = new String[0];
+
+    private String[] locations;
+
+    public TeraFileSplit() {
+      locations = ZERO_LOCATIONS;
+    }
+    public TeraFileSplit(Path file, long start, long length, String[] hosts) {
+      super(file, start, length, hosts);
+      try {
+        locations = super.getLocations();
+      } catch (IOException e) {
+        locations = ZERO_LOCATIONS;
+      }
+    }
+
+    // XXXXXX should this also be null-protected?
+    protected void setLocations(String[] hosts) {
+      locations = hosts;
+    }
+
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(getPath());
+      result.append(" from ");
+      result.append(getStart());
+      result.append(" length ");
+      result.append(getLength());
+      for(String host: getLocations()) {
+        result.append(" ");
+        result.append(host);
+      }
+      return result.toString();
+    }
+  }
+
+  static class TextSampler implements IndexedSortable {
+    private ArrayList<Text> records = new ArrayList<Text>();
+
+    public int compare(int i, int j) {
+      Text left = records.get(i);
+      Text right = records.get(j);
+      return left.compareTo(right);
+    }
+
+    public void swap(int i, int j) {
+      Text left = records.get(i);
+      Text right = records.get(j);
+      records.set(j, left);
+      records.set(i, right);
+    }
+
+    public void addKey(Text key) {
+      synchronized (this) {
+        records.add(new Text(key));
+      }
+    }
+
+    /**
+     * Find the split points for a given sample. The sample keys are sorted
+     * and down sampled to find even split points for the partitions. The
+     * returned keys should be the start of their respective partitions.
+     * @param numPartitions the desired number of partitions
+     * @return an array of size numPartitions - 1 that holds the split points
+     */
+    Text[] createPartitions(int numPartitions) {
+      int numRecords = records.size();
+      System.out.println("Making " + numPartitions + " from " + numRecords + 
+                         " sampled records");
+      if (numPartitions > numRecords) {
+        throw new IllegalArgumentException
+          ("Requested more partitions than input keys (" + numPartitions +
+           " > " + numRecords + ")");
+      }
+      new QuickSort().sort(this, 0, records.size());
+      float stepSize = numRecords / (float) numPartitions;
+      Text[] result = new Text[numPartitions-1];
+      for(int i=1; i < numPartitions; ++i) {
+        result[i-1] = records.get(Math.round(stepSize * i));
+      }
+      return result;
+    }
+  }
+  
+  /**
+   * Use the input splits to take samples of the input and generate sample
+   * keys. By default reads 100,000 keys from 10 locations in the input, sorts
+   * them and picks N-1 keys to generate N equally sized partitions.
+   * @param job the job to sample
+   * @param partFile where to write the output file to
+   * @throws Throwable if something goes wrong
+   */
+  public static void writePartitionFile(final JobContext job, 
+      Path partFile) throws Throwable  {
+    long t1 = System.currentTimeMillis();
+    Configuration conf = job.getConfiguration();
+    final TeraInputFormat inFormat = new TeraInputFormat();
+    final TextSampler sampler = new TextSampler();
+    int partitions = job.getNumReduceTasks();
+    long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
+    final List<InputSplit> splits = inFormat.getSplits(job);
+    long t2 = System.currentTimeMillis();
+    System.out.println("Computing input splits took " + (t2 - t1) + "ms");
+    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
+    System.out.println("Sampling " + samples + " splits of " + splits.size());
+    final long recordsPerSample = sampleSize / samples;
+    final int sampleStep = splits.size() / samples;
+    Thread[] samplerReader = new Thread[samples];
+    SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
+    // take N samples from different parts of the input
+    for(int i=0; i < samples; ++i) {
+      final int idx = i;
+      samplerReader[i] = 
+        new Thread (threadGroup,"Sampler Reader " + idx) {
+        {
+          setDaemon(true);
+        }
+        public void run() {
+          long records = 0;
+          try {
+            TaskAttemptContext context = new TaskAttemptContextImpl(
+              job.getConfiguration(), new TaskAttemptID());
+            RecordReader<Text, Text> reader = 
+              inFormat.createRecordReader(splits.get(sampleStep * idx),
+              context);
+            reader.initialize(splits.get(sampleStep * idx), context);
+            while (reader.nextKeyValue()) {
+              sampler.addKey(new Text(reader.getCurrentKey()));
+              records += 1;
+              if (recordsPerSample <= records) {
+                break;
+              }
+            }
+          } catch (IOException ie){
+            System.err.println("Got an exception while reading splits " +
+                StringUtils.stringifyException(ie));
+            throw new RuntimeException(ie);
+          } catch (InterruptedException e) {
+        	  
+          }
+        }
+      };
+      samplerReader[i].start();
+    }
+    FileSystem outFs = partFile.getFileSystem(conf);
+    DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10, 
+                                           outFs.getDefaultBlockSize(partFile));
+    for (int i = 0; i < samples; i++) {
+      try {
+        samplerReader[i].join();
+        if(threadGroup.getThrowable() != null){
+          throw threadGroup.getThrowable();
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+    for(Text split : sampler.createPartitions(partitions)) {
+      split.write(writer);
+    }
+    writer.close();
+    long t3 = System.currentTimeMillis();
+    System.out.println("Computing parititions took " + (t3 - t2) + "ms");
+  }
+  
+  static class SamplerThreadGroup extends ThreadGroup{
+
+    private Throwable throwable;
+
+    public SamplerThreadGroup(String s) {
+      super(s);
+    }
+    
+    @Override
+    public void uncaughtException(Thread thread, Throwable throwable) {
+      this.throwable = throwable;
+    }
+    
+    public Throwable getThrowable() {
+      return this.throwable;
+    }
+    
+  }
+
+  static class TeraRecordReader extends RecordReader<Text,Text> {
+    private FSDataInputStream in;
+    private long offset;
+    private long length;
+    private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+    private byte[] buffer = new byte[RECORD_LENGTH];
+    private Text key;
+    private Text value;
+
+    public TeraRecordReader() throws IOException {
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Path p = ((FileSplit)split).getPath();
+      FileSystem fs = p.getFileSystem(context.getConfiguration());
+      in = fs.open(p);
+      long start = ((FileSplit)split).getStart();
+      // find the offset to start at a record boundary
+      offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
+      in.seek(start + offset);
+      length = ((FileSplit)split).getLength();
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    public Text getCurrentKey() {
+      return key;
+    }
+
+    public Text getCurrentValue() {
+      return value;
+    }
+
+    public float getProgress() throws IOException {
+      return (float) offset / length;
+    }
+
+    public boolean nextKeyValue() throws IOException {
+      if (offset >= length) {
+        return false;
+      }
+      int read = 0;
+      while (read < RECORD_LENGTH) {
+        long newRead = in.read(buffer, read, RECORD_LENGTH - read);
+        if (newRead == -1) {
+          if (read == 0) {
+            return false;
+          } else {
+            throw new EOFException("read past eof");
+          }
+        }
+        read += newRead;
+      }
+      if (key == null) {
+        key = new Text();
+      }
+      if (value == null) {
+        value = new Text();
+      }
+      key.set(buffer, 0, KEY_LENGTH);
+      value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
+      offset += RECORD_LENGTH;
+      return true;
+    }
+  }
+
+  @Override
+  public RecordReader<Text, Text> 
+      createRecordReader(InputSplit split, TaskAttemptContext context) 
+      throws IOException {
+    return new TeraRecordReader();
+  }
+
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new TeraFileSplit(file, start, length, hosts);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    if (job == lastContext) {
+      return lastResult;
+    }
+    long t1, t2, t3;
+    t1 = System.currentTimeMillis();
+    lastContext = job;
+    lastResult = super.getSplits(job);
+    t2 = System.currentTimeMillis();
+    System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
+    if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
+      TeraScheduler scheduler = new TeraScheduler(
+        lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
+      lastResult = scheduler.getNewFileSplits();
+      t3 = System.currentTimeMillis(); 
+      System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
+    }
+    return lastResult;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * An output format that writes the key and value appended together.
+ */
+public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
+  static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
+  private OutputCommitter committer = null;
+
+  /**
+   * Set the requirement for a final sync before the stream is closed.
+   */
+  static void setFinalSync(JobContext job, boolean newValue) {
+    job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+  }
+
+  /**
+   * Does the user want a final sync at close?
+   */
+  public static boolean getFinalSync(JobContext job) {
+    return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+  }
+
+  static class TeraRecordWriter extends RecordWriter<Text,Text> {
+    private boolean finalSync = false;
+    private FSDataOutputStream out;
+
+    public TeraRecordWriter(FSDataOutputStream out,
+                            JobContext job) {
+      finalSync = getFinalSync(job);
+      this.out = out;
+    }
+
+    public synchronized void write(Text key, 
+                                   Text value) throws IOException {
+      out.write(key.getBytes(), 0, key.getLength());
+      out.write(value.getBytes(), 0, value.getLength());
+    }
+    
+    public void close(TaskAttemptContext context) throws IOException {
+      if (finalSync) {
+        out.hsync();
+      }
+      out.close();
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext job
+                              ) throws InvalidJobConfException, IOException {
+    // Ensure that the output directory is set
+    Path outDir = getOutputPath(job);
+    if (outDir == null) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
+    }
+
+    // get delegation token for outDir's file system
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { outDir }, job.getConfiguration());
+  }
+
+  public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
+                                                 ) throws IOException {
+    Path file = getDefaultWorkFile(job, "");
+    FileSystem fs = file.getFileSystem(job.getConfiguration());
+     FSDataOutputStream fileOut = fs.create(file);
+    return new TeraRecordWriter(fileOut, job);
+  }
+  
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+      throws IOException {
+    if (committer == null) {
+      Path output = getOutputPath(context);
+      committer = new FileOutputCommitter(output, context);
+    }
+    return committer;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native