You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [29/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex Thu Apr 18 23:54:18 2013
@@ -0,0 +1,370 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements. See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership. The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License. You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+\documentclass{article}
+\usepackage[pdftex]{hyperref}
+\usepackage[pdftex]{graphicx}
+
+\title{Winning a 60 Second Dash with a Yellow Elephant}
+\author{\href{http://people.apache.org/~omalley}{Owen O'Malley} and
+ \href{http://people.apache.org/~acmurthy}{Arun C. Murthy}\\
+\href{http://www.yahoo.com/}{Yahoo!}\\
+owen@yahoo-inc.com and acm@yahoo-inc.com}
+\date{April 2009}
+\begin{document}
+\maketitle
+\href{http://hadoop.apache.org/core}{Apache Hadoop} is a open source
+software framework that dramatically simplifies writing distributed
+data intensive applications. It provides a distributed file system,
+which is modeled after the Google File System\cite{gfs}, and a
+map/reduce\cite{mapreduce} implementation that manages distributed
+computation. Jim Gray defined a benchmark to compare large sorting
+programs. Since the core of map/reduce is a distributed sort, most of
+the custom code is glue to get the desired behavior.
+
+\section{Benchmark Rules}
+
+Jim's Gray's sort benchmark consists of a set of many related
+benchmarks, each with their own rules. All of the sort benchmarks
+measure the time to sort different numbers of 100 byte records. The
+first 10 bytes of each record is the key and the rest is the
+value. The \textbf{minute sort} must finish end to end in less than a
+minute. The \textbf{Gray sort} must sort more than 100 terabytes and
+must run for at least an hour.
+
+\begin{itemize}
+\item The input data must precisely match the data generated by the C
+ data generator.
+\item The input must not be in the operating system's file
+ cache when the job starts.. Under Linux, this requires using the memory for something
+ else between sorting runs.
+\item The input and output data must not be compressed.
+\item The output must not overwrite the input.
+\item The output must be synced to disk.
+\item The 128 bit sum of the crc32's of each key/value pair must be
+ calculated for the input and output. Naturally, they must be
+ identical.
+\item The output may be divided into multiple output files, but it
+ must be totally ordered (simply concatenating the output files must
+ produce the completely sorted output).
+\item Starting and distributing the application to the cluster must be
+ included in the execution time.
+\item Any sampling must be included in the execution time.
+\end{itemize}
+
+\section{Hadoop implementation}
+
+We extended the programs from last year to create and manipulate the
+new binary format and match the new rules. There are now 4 Hadoop
+map/reduce applications to support the benchmark:
+\begin{enumerate}
+\item \textbf{TeraGen} is a map/reduce program to generate the data.
+\item \textbf{TeraSort} samples the input data and uses map/reduce to
+ sort the data into a total order.
+\item \textbf{TeraSum} is a map/reduce program computes the 128 bit
+ sum of the crc32 of each key/value pair.
+\item \textbf{TeraValidate} is a map/reduce program that validates the
+ output is sorted and computes the sum of the checksums as TeraSum.
+\end{enumerate}
+The update to the terasort programs will be checked in as
+\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716}.
+
+\textbf{TeraGen} generates input data for the sort that is byte for byte
+equivalent to the C version that was released in March of 2009,
+including specific keys and values. It divides the desired number of
+rows by the desired number of tasks and assigns ranges of rows to each
+map. The map jumps the random number generator to the correct value
+for the first row and generates the following rows.
+
+\textbf{TeraSort} is a standard map/reduce sort, except for a custom
+partitioner that ensures that all of the keys in reduce $N$ are after
+all of the keys in reduce $N-1$. This is a requirement of the contest
+so that the output of the sort is totally ordered, even if it is
+divided up by reduce.
+
+We wrote an input and output format, used by all 4 applications to
+read and write the files in the new format.
+
+\textbf{TeraSum} computes the 128 bit sum of the CRC32 of each
+key/value pair. Each map computes the sum of its input and emits a
+single 128 bit sum. There is a single reduce that adds the sums from
+each map. We used this program on the input directory to calculate the
+sum of the checksums of each key/value pair to check the correctness
+of the output of the sort. We also used TeraSum on a distinct dataset
+that was larger than the total RAM in the cluster to flush the Linux
+file cache between runs of the small (500 GB and 1TB) sorts.
+
+\textbf{TeraValidate} ensures that the output is globally sorted. It
+creates one map per file in the output directory and each map
+ensures that each key is less than or equal to the previous one. The
+map also generates records with the first and last keys of the file
+and the reduce ensures that the first key of file $i$ is greater that
+the last key of file $i-1$. Any problems are reported as output of the
+reduce with the keys that are out of order. Additionally, TeraValidate
+calculates the sum of checksums of the output directory.
+
+\section{Hardware and Operating System}
+
+We ran our benchmarks on Yahoo's Hammer cluster. Hammer's hardware is
+very similar to the hardware that we used in last year's terabyte
+sort. The hardware and operating system details are:
+
+\begin{itemize}
+\item approximately 3800 nodes (in such a large cluster, nodes are
+ always down)
+\item 2 quad core Xeons @ 2.5ghz per node
+\item 4 SATA disks per node
+\item 8G RAM per node (upgraded to 16GB before the petabyte sort)
+\item 1 gigabit ethernet on each node
+\item 40 nodes per rack
+\item 8 gigabit ethernet uplinks from each rack to the core
+\item Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+\item Sun Java JDK (1.6.0\_05-b13 and 1.6.0\_13-b03) (32 and 64 bit)
+\end{itemize}
+
+We hit a JVM bug in 1.6.0\_05-b13 on the larger sorts (100TB and 1PB)
+and switched over to the later JVM, which resolved the issue. For the
+larger sorts, we used 64 bit JVMs for the Name Node and Job Tracker.
+
+\section{Software and Configuration}
+
+The version of Hadoop we used was a private branch of trunk that was
+started in January 2009, which is after the 0.20 branch was feature
+frozen. We used git to manage our branch and it allowed us to easily
+coordinate our work, track our changes, and resynchronize with the
+current Hadoop trunk.
+
+The changes include:
+
+\begin{enumerate}
+
+\item Updated the terasort example in the Hadoop code base to match
+ the dataset defined by the rule changes in the benchmark from March
+ of 2009.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716})
+
+\item We reimplemented the reducer side of Hadoop's shuffle. The
+ redesign improved the performance of the shuffle and removed
+ bottlenecks and over-throttling. It also made the code more
+ maintainable and understandable by breaking a 3000 line Java file
+ into multiple classes with a clean set of interfaces.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5223}{HADOOP-5223})
+
+\item The new shuffle also fetches multiple map outputs from the same
+ node over each connection rather than one at a time. Fetching
+ multiple map outputs at the same time avoids connection setup costs
+ and also avoids the round trip while the server responds to the request.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-1338}{HADOOP-1338})
+
+\item Allowed configuring timeouts on the shuffle connections and we
+ shortened them for the small sorts. We observed cases where the
+ connections for the shuffle would hang until the timeout, which made
+ low latency jobs impossibly long.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5789}{HADOOP-5789})
+
+\item Set TCP no-delay and more frequent pings between the Task and
+ the Task Tracker to reduce latency in detecting problems.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5788}{HADOOP-5788})
+
+\item We added some protection code to detect incorrect data being
+ transmitted in the shuffle from causing the reduce to fail. It
+ appears this is either a JVM NIO bug or Jetty bug that likely
+ affects 0.20 and trunk under heavy load.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5783}{HADOOP-5783})
+
+\item We used LZO compression on the map outputs. On the new dataset, LZO
+ compresses down to 45\% of the original size. By comparison, the
+ dataset from last year compresses to 20\% of the original size. Last
+ year, the shuffle would run out of direct buffers if we used
+ compression on the map outputs.
+
+\item We implemented memory to memory merges in the reduce during the
+ shuffle to combine the map outputs in memory before we finish the
+ shuffle, thereby reducing the work needed when the reduce is
+ running.
+
+\item We multi-threaded the sampling code that read the input set to
+ find the partition points between the reduces. We also wrote a
+ simple partitioner that assumes the keys are evenly
+ distributed. Since the new dataset does not require sampling, the
+ simple partitioner produces very even partitions.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-4946}{HADOOP-4946})
+
+\item On the smaller clusters, we configured the system with faster
+ heartbeat cycles from the Task Trackers to the Job Tracker (it
+ defaults to 10 secs / 1000 nodes, but we made it configurable and
+ brought it down to 2 seconds/1000 nodes to provide lower latency)
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5784}{HADOOP-5784})
+
+\item Typically the Job Tracker assigns tasks to Task Trackers on a
+ first come first served basis. This greedy assignment of tasks did
+ not lead to good data locality. However, by taking a global view and
+ placing all of the map tasks at once, the system achieves much better
+ locality. Rather than implement global scheduling for all of Hadoop,
+ which would be much harder, we implemented a global scheduler for
+ the terasort example in the input format. Basically, the input
+ format computes the splits and assigns work to the nodes that have
+ the fewest blocks first. For a node that has more blocks
+ than map slots, it picks the block that have the fewest remaining
+ locations left. This greedy global algorithm seems to get very good
+ locality. The input format would schedule the maps and then change
+ the input split descriptions to only have a single location instead
+ of the original 3. This increased task locality by 40\% or so over
+ the greedy scheduler.
+
+\item Hadoop 0.20 added setup and cleanup tasks. Since they are not
+ required for the sort benchmarks, we allow them to be disabled to
+ reduce the latency of starting and stopping the job.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5785}{HADOOP-5785})
+
+\item We discovered a performance problem where in some contexts the
+ cost of using the JNI-based CRC32 was very high. By implementing it
+ in pure Java, the average case is a little slower, but the worst
+ case is much better.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5598}{HADOOP-5598})
+
+\item We found and removed some hard-coded wait loops from the
+ framework that don't matter for large jobs, but can seriously slow
+ down low latency jobs.
+
+\item Allowed setting the logging level for the tasks, so that we
+ could cut down on logging. When running for "real" we configure the
+ logging level to WARN instead of the default INFO. Reducing the
+ amount of logging has a huge impact on the performance of the
+ system, but obviously makes debugging and analysis much harder.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5786}{HADOOP-5786})
+
+\item One optimization that we didn't finish is to optimize the job
+ planning code. Currently, it uses an RPC to the Name Node for each
+ input file, which we have observed taking a substantial amount of
+ time. For the terabyte sort, our investigations show that we
+ could save about 4 seconds out of the 8 that were spent on setting
+ up the job.
+ (\href{http://issues.apache.org/jira/browse/HADOOP-5795}{HADOOP-5795})
+
+\end{enumerate}
+
+\section{Results}
+
+Hadoop has made a lot of progress in the last year and we were able to
+run much lower latency jobs as well as much larger jobs. Note that in
+any large cluster and distributed application, there are a lot of
+moving pieces and thus we have seen a wide variation in execution
+times. As Hadoop evolves and becomes more graceful in the presence of
+hardware degradation and failure, this variation should smooth
+out. The best times for each of the listed sort sizes were:
+\\
+
+\begin{tabular}{| c | c | c | c | c | c |}
+\hline
+Bytes & Nodes & Maps & Reduces & Replication & Time \\
+\hline
+$5*10^{11}$ & 1406 & 8000 & 2600 & 1 & 59 seconds \\
+$10^{12}$ & 1460 & 8000 & 2700 & 1 & 62 seconds \\
+$10^{14}$ & 3452 & 190,000 & 10,000 & 2 & 173 minutes \\
+$10^{15}$ & 3658 & 80,000 & 20,000 & 2 & 975 minutes \\
+\hline
+\end{tabular}\\
+
+Within the rules for the 2009 Gray sort, our 500 GB sort set a new
+record for the minute sort and the 1PB sort set a new record of 1.03
+TB/minute. The 62 second terabyte sort would have set a new record,
+but the terabyte benchmark that we won last year has been
+retired. (Clearly the minute sort and terabyte sort are rapidly
+converging, and thus it is not a loss.) One piece of trivia is that
+only the petabyte dataset had any duplicate keys (40 of them).
+
+Because the smaller sorts needed lower latency and faster network, we
+only used part of the cluster for those runs. In particular, instead
+of our normal 5:1 over subscription between racks, we limited it to 16
+nodes in each rack for a 2:1 over subscription. The smaller runs can
+also use output replication of 1, because they only take minutes to
+run and run on smaller clusters, the likelihood of a node failing is
+fairly low. On the larger runs, failure is expected and thus
+replication of 2 is required. HDFS protects against data loss during
+rack failure by writing the second replica on a different rack and
+thus writing the second replica is relatively slow.
+
+We've included the timelines for the jobs counting from the job
+submission at the Job Tracker. The diagrams show the number of tasks
+running at each point in time. While maps only have a single phase,
+the reduces have three: \textbf{shuffle}, \textbf{merge}, and
+\textbf{reduce}. The shuffle is the transfer of the data from the
+maps. Merge doesn't happen in these benchmarks, because none of the
+reduces need multiple levels of merges. Finally, the reduce phase is
+where the final merge and writing to HDFS happens. I've also included
+a category named \textbf{waste} that represents task attempts that
+were running, but ended up either failing, or being killed (often as
+speculatively executed task attempts). The job logs and configuration
+for the four runs, which are the raw data for the charts, are
+available on
+\href{http://people.apache.org/~omalley/tera-2009/}{http://people.apache.org/~omalley/tera-2009/}.
+
+If you compare this years charts to last year's, you'll notice that
+tasks are launching much faster now. Last year we only launched one
+task per heartbeat, so it took 40 seconds to get all of the tasks
+launched. Now, Hadoop will fill up a Task Tracker in a single
+heartbeat. Reducing that job launch overhead is very important
+for getting runs under a minute.
+
+As with last year, we ran with significantly larger tasks than the
+defaults for Hadoop. Even with the new more aggressive shuffle,
+minimizing the number of transfers (maps * reduces) is very important
+to the performance of the job. Notice that in the petabyte sort, each
+map is processing 15 GB instead of the default 128 MB and each reduce
+is handling 50 GB. When we ran the petabyte with more typical values
+1.5 GB / map, it took 40 hours to finish. Therefore, to increase
+throughput, it makes sense to consider increasing the default block
+size, which translates into the default map size, to at least up to 1
+GB.
+
+\section{Comments on the Rule Changes}
+
+The group that runs the Gray Sort Benchmark made very substantial
+changes to the rules this year. The changes were not announced; but
+rather appeared on the website in March. We feel that it was too late
+to make rule changes and that the benchmark should have been changed
+next year. We'd also like to point out that while most of the changes to
+the data generator were positive, it was a poor choice to remove the
+skewed distribution of the keys. The previously skewed distribution
+required sampling of the input to pick good partition points between
+the reduces. The current dataset picks keys so completely random that
+sampling is counter productive and yields even less distributions between the
+reduces.
+
+\bibliographystyle{abbrv}
+\bibliography{tera}
+
+\begin{figure}[!p]
+\includegraphics[width=4.21in]{500GBTaskTime.png}
+\caption{500 GB sort tasks across time}\label{500GbTimeline}
+\end{figure}
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1TBTaskTime.png}
+\caption{1 TB sort tasks across time}\label{1TbTimeline}
+\end{figure}
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{100TBTaskTime.png}
+\caption{100 TB sort tasks across time}\label{100TbTimeline}
+\end{figure}
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1PBTaskTime.png}
+\caption{1 PB sort tasks across time}\label{1PbTimeline}
+\end{figure}
+
+\end{document}
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib Thu Apr 18 23:54:18 2013
@@ -0,0 +1,31 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements. See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership. The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License. You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+@INPROCEEDINGS{mapreduce,
+ AUTHOR = "Jeffery Dean and Sanjay Ghemawat",
+ TITLE = "MapReduce: Simplified Data Processing on Large Clusters",
+ BOOKTITLE = "Sixth Symposium on Operating System Design and Implementation",
+ MONTH = "December",
+ ADDRESS = "San Francisco, CA",
+ YEAR = {2004} }
+
+@INPROCEEDINGS{gfs,
+ AUTHOR = "Sanjay Ghemawat and Howard Gobioff and Shun-Tak Leung",
+ TITLE = "The Google File System",
+ BOOKTITLE = "19th Symposium on Operating Systems Principles",
+ ORGANIZATION = "ACM",
+ MONTH = "October",
+ ADDRESS = "Lake George, NY",
+ YEAR = {2003} }
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+
+/**
+ * A single process data generator for the terasort data. Based on gensort.c
+ * version 1.1 (3 Mar 2009) from Chris Nyberg <ch...@ordinal.com>.
+ */
+public class GenSort {
+
+ /**
+ * Generate a "binary" record suitable for all sort benchmarks *except*
+ * PennySort.
+ */
+ static void generateRecord(byte[] recBuf, Unsigned16 rand,
+ Unsigned16 recordNumber) {
+ /* generate the 10-byte key using the high 10 bytes of the 128-bit
+ * random number
+ */
+ for(int i=0; i < 10; ++i) {
+ recBuf[i] = rand.getByte(i);
+ }
+
+ /* add 2 bytes of "break" */
+ recBuf[10] = 0x00;
+ recBuf[11] = 0x11;
+
+ /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+ * as the next 32 bytes of the record.
+ */
+ for (int i = 0; i < 32; i++) {
+ recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+ }
+
+ /* add 4 bytes of "break" data */
+ recBuf[44] = (byte) 0x88;
+ recBuf[45] = (byte) 0x99;
+ recBuf[46] = (byte) 0xAA;
+ recBuf[47] = (byte) 0xBB;
+
+ /* add 48 bytes of filler based on low 48 bits of random number */
+ for(int i=0; i < 12; ++i) {
+ recBuf[48+i*4] = recBuf[49+i*4] = recBuf[50+i*4] = recBuf[51+i*4] =
+ (byte) rand.getHexDigit(20 + i);
+ }
+
+ /* add 4 bytes of "break" data */
+ recBuf[96] = (byte) 0xCC;
+ recBuf[97] = (byte) 0xDD;
+ recBuf[98] = (byte) 0xEE;
+ recBuf[99] = (byte) 0xFF;
+ }
+
+
+ private static BigInteger makeBigInteger(long x) {
+ byte[] data = new byte[8];
+ for(int i=0; i < 8; ++i) {
+ data[i] = (byte) (x >>> (56 - 8*i));
+ }
+ return new BigInteger(1, data);
+ }
+
+ private static final BigInteger NINETY_FIVE = new BigInteger("95");
+
+ /**
+ * Generate an ascii record suitable for all sort benchmarks including
+ * PennySort.
+ */
+ static void generateAsciiRecord(byte[] recBuf, Unsigned16 rand,
+ Unsigned16 recordNumber) {
+
+ /* generate the 10-byte ascii key using mostly the high 64 bits.
+ */
+ long temp = rand.getHigh8();
+ if (temp < 0) {
+ // use biginteger to avoid the negative sign problem
+ BigInteger bigTemp = makeBigInteger(temp);
+ recBuf[0] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+ temp = bigTemp.divide(NINETY_FIVE).longValue();
+ } else {
+ recBuf[0] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ }
+ for(int i=1; i < 8; ++i) {
+ recBuf[i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ }
+ temp = rand.getLow8();
+ if (temp < 0) {
+ BigInteger bigTemp = makeBigInteger(temp);
+ recBuf[8] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+ temp = bigTemp.divide(NINETY_FIVE).longValue();
+ } else {
+ recBuf[8] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ }
+ recBuf[9] = (byte)(' ' + (temp % 95));
+
+ /* add 2 bytes of "break" */
+ recBuf[10] = ' ';
+ recBuf[11] = ' ';
+
+ /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+ * as the next 32 bytes of the record.
+ */
+ for (int i = 0; i < 32; i++) {
+ recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+ }
+
+ /* add 2 bytes of "break" data */
+ recBuf[44] = ' ';
+ recBuf[45] = ' ';
+
+ /* add 52 bytes of filler based on low 48 bits of random number */
+ for(int i=0; i < 13; ++i) {
+ recBuf[46+i*4] = recBuf[47+i*4] = recBuf[48+i*4] = recBuf[49+i*4] =
+ (byte) rand.getHexDigit(19 + i);
+ }
+
+ /* add 2 bytes of "break" data */
+ recBuf[98] = '\r'; /* nice for Windows */
+ recBuf[99] = '\n';
+}
+
+
+ private static void usage() {
+ PrintStream out = System.out;
+ out.println("usage: gensort [-a] [-c] [-bSTARTING_REC_NUM] NUM_RECS FILE_NAME");
+ out.println("-a Generate ascii records required for PennySort or JouleSort.");
+ out.println(" These records are also an alternative input for the other");
+ out.println(" sort benchmarks. Without this flag, binary records will be");
+ out.println(" generated that contain the highest density of randomness in");
+ out.println(" the 10-byte key.");
+ out.println( "-c Calculate the sum of the crc32 checksums of each of the");
+ out.println(" generated records and send it to standard error.");
+ out.println("-bN Set the beginning record generated to N. By default the");
+ out.println(" first record generated is record 0.");
+ out.println("NUM_RECS The number of sequential records to generate.");
+ out.println("FILE_NAME The name of the file to write the records to.\n");
+ out.println("Example 1 - to generate 1000000 ascii records starting at record 0 to");
+ out.println("the file named \"pennyinput\":");
+ out.println(" gensort -a 1000000 pennyinput\n");
+ out.println("Example 2 - to generate 1000 binary records beginning with record 2000");
+ out.println("to the file named \"partition2\":");
+ out.println(" gensort -b2000 1000 partition2");
+ System.exit(1);
+ }
+
+
+ public static void outputRecords(OutputStream out,
+ boolean useAscii,
+ Unsigned16 firstRecordNumber,
+ Unsigned16 recordsToGenerate,
+ Unsigned16 checksum
+ ) throws IOException {
+ byte[] row = new byte[100];
+ Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
+ Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
+ Checksum crc = new PureJavaCrc32();
+ Unsigned16 tmp = new Unsigned16();
+ lastRecordNumber.add(recordsToGenerate);
+ Unsigned16 ONE = new Unsigned16(1);
+ Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
+ while (!recordNumber.equals(lastRecordNumber)) {
+ Random16.nextRand(rand);
+ if (useAscii) {
+ generateAsciiRecord(row, rand, recordNumber);
+ } else {
+ generateRecord(row, rand, recordNumber);
+ }
+ if (checksum != null) {
+ crc.reset();
+ crc.update(row, 0, row.length);
+ tmp.set(crc.getValue());
+ checksum.add(tmp);
+ }
+ recordNumber.add(ONE);
+ out.write(row);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Unsigned16 startingRecord = new Unsigned16();
+ Unsigned16 numberOfRecords;
+ OutputStream out;
+ boolean useAscii = false;
+ Unsigned16 checksum = null;
+
+ int i;
+ for(i=0; i < args.length; ++i) {
+ String arg = args[i];
+ int argLength = arg.length();
+ if (argLength >= 1 && arg.charAt(0) == '-') {
+ if (argLength < 2) {
+ usage();
+ }
+ switch (arg.charAt(1)) {
+ case 'a':
+ useAscii = true;
+ break;
+ case 'b':
+ startingRecord = Unsigned16.fromDecimal(arg.substring(2));
+ break;
+ case 'c':
+ checksum = new Unsigned16();
+ break;
+ default:
+ usage();
+ }
+ } else {
+ break;
+ }
+ }
+ if (args.length - i != 2) {
+ usage();
+ }
+ numberOfRecords = Unsigned16.fromDecimal(args[i]);
+ out = new FileOutputStream(args[i+1]);
+
+ outputRecords(out, useAscii, startingRecord, numberOfRecords, checksum);
+ out.close();
+ if (checksum != null) {
+ System.out.println(checksum);
+ }
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ * or 0x2360ed051fc65da44385df649fccf645
+ * and c is 98910279301475397889117759788405497857
+ * or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg
+ * <ch...@ordinal.com>.
+ */
+class Random16 {
+
+ /**
+ * The "Gen" array contain powers of 2 of the linear congruential generator.
+ * The index 0 struct contain the "a" coefficient and "c" constant for the
+ * generator. That is, the generator is:
+ * f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+ *
+ * All structs after the first contain an "a" and "c" that
+ * comprise the square of the previous function.
+ *
+ * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+ * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+ * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+ * ...
+
+ */
+ private static class RandomConstant {
+ final Unsigned16 a;
+ final Unsigned16 c;
+ public RandomConstant(String left, String right) {
+ a = new Unsigned16(left);
+ c = new Unsigned16(right);
+ }
+ }
+
+ private static final RandomConstant[] genArray = new RandomConstant[]{
+ /* [ 0] */ new RandomConstant("2360ed051fc65da44385df649fccf645",
+ "4a696d47726179524950202020202001"),
+ /* [ 1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99",
+ "95e0e48262b3edfe04479485c755b646"),
+ /* [ 2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771",
+ "882a02c315362b60765f100068b33a1c"),
+ /* [ 3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1",
+ "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+ /* [ 4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1",
+ "f25bd15439d16af594c1b1bafa6239f0"),
+ /* [ 5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781",
+ "89ca67c29c9397d59c612596145db7e0"),
+ /* [ 6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01",
+ "8b6ae036713bd578a8093c8eae5c7fc0"),
+ /* [ 7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01",
+ "98a2542fd23d0dbdff3b886cdb1d3f80"),
+ /* [ 8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01",
+ "954db923fdb7933e947cd1edcecb7f00"),
+ /* [ 9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801",
+ "00be4a36657c98cd204e8c8af7dafe00"),
+ /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001",
+ "991965329dccb28d581199ab18c5fc00"),
+ /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001",
+ "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+ /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001",
+ "2b657bbfd6ed9d632079e70c3c97f000"),
+ /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+ "59b60ee4c52fa49e9fe90682bd2fe000"),
+ /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001",
+ "cc099c88030679464fe86aae8a5fc000"),
+ /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001",
+ "06b9abff9f9f33dd30362c0154bf8000"),
+ /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001",
+ "e296707121688d5a0260b293a97f0000"),
+ /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001",
+ "189ffc4701ff23cb8f8acf6b52fe0000"),
+ /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001",
+ "5141110ab208fb9d61fb47e6a5fc0000"),
+ /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001",
+ "3c97caa62540f2948d8d340d4bf80000"),
+ /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001",
+ "1b25cb9cfe5a0c963174f91a97f00000"),
+ /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001",
+ "0c644570b4a487103c5436352fe00000"),
+ /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001",
+ "3d0589c28869472bde517c6a5fc00000"),
+ /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001",
+ "bc95e5ab36477e65534738d4bf800000"),
+ /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001",
+ "ddb02ff72a031c01011f71a97f000000"),
+ /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001",
+ "2561426086d9acdb6c82e352fe000000"),
+ /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001",
+ "64a788e3c118ed1c8215c6a5fc000000"),
+ /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001",
+ "e65ea321908627cfa86b8d4bf8000000"),
+ /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001",
+ "53d27225604d85f9e1d71a97f0000000"),
+ /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001",
+ "ca5ec7a3ed1fe55e07ae352fe0000000"),
+ /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001",
+ "4daebb2e085330651f5c6a5fc0000000"),
+ /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001",
+ "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+ /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001",
+ "158c62f2b31e496dfd71a97f00000000"),
+ /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001",
+ "290e84a2eb15fd1ffae352fe00000000"),
+ /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001",
+ "e3dc1bfbe991a34ff5c6a5fc00000000"),
+ /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001",
+ "ddf540d020b9eadfeb8d4bf800000000"),
+ /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001",
+ "8ee4950177ce66bfd71a97f000000000"),
+ /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001",
+ "39e0f787c907117fae352fe000000000"),
+ /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001",
+ "659d2522f7b732ff5c6a5fc000000000"),
+ /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001",
+ "9e8722938612a5feb8d4bf8000000000"),
+ /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001",
+ "e941a65d66b64bfd71a97f0000000000"),
+ /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001",
+ "7b50d19437b097fae352fe0000000000"),
+ /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001",
+ "59d7b68e18712ff5c6a5fc0000000000"),
+ /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001",
+ "4087bab2d5225feb8d4bf80000000000"),
+ /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001",
+ "b470abc03b44bfd71a97f00000000000"),
+ /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001",
+ "366630eaba897fae352fe00000000000"),
+ /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001",
+ "a2dfc77e8512ff5c6a5fc00000000000"),
+ /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001",
+ "1e0d25a14a25feb8d4bf800000000000"),
+ /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001",
+ "9d50a5d3944bfd71a97f000000000000"),
+ /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001",
+ "bf7ab5eb2897fae352fe000000000000"),
+ /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001",
+ "925b14e6512ff5c6a5fc000000000000"),
+ /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001",
+ "724cce0ca25feb8d4bf8000000000000"),
+ /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001",
+ "1af42d1944bfd71a97f0000000000000"),
+ /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001",
+ "0f529e32897fae352fe0000000000000"),
+ /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001",
+ "844e4c6512ff5c6a5fc0000000000000"),
+ /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+ "9f40d8ca25feb8d4bf80000000000000"),
+ /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+ "9912b1944bfd71a97f00000000000000"),
+ /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+ "9c69632897fae352fe00000000000000"),
+ /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+ "e1e2c6512ff5c6a5fc00000000000000"),
+ /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+ "68058ca25feb8d4bf800000000000000"),
+ /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+ "610b1944bfd71a97f000000000000000"),
+ /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+ "061632897fae352fe000000000000000"),
+ /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+ "1c2c6512ff5c6a5fc000000000000000"),
+ /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+ "7858ca25feb8d4bf8000000000000000"),
+ /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+ "f0b1944bfd71a97f0000000000000000"),
+ /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+ "e1632897fae352fe0000000000000000"),
+ /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+ "c2c6512ff5c6a5fc0000000000000000"),
+ /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+ "858ca25feb8d4bf80000000000000000"),
+ /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+ "0b1944bfd71a97f00000000000000000"),
+ /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+ "1632897fae352fe00000000000000000"),
+ /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+ "2c6512ff5c6a5fc00000000000000000"),
+ /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+ "58ca25feb8d4bf800000000000000000"),
+ /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+ "b1944bfd71a97f000000000000000000"),
+ /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+ "632897fae352fe000000000000000000"),
+ /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+ "c6512ff5c6a5fc000000000000000000"),
+ /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+ "8ca25feb8d4bf8000000000000000000"),
+ /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+ "1944bfd71a97f0000000000000000000"),
+ /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+ "32897fae352fe0000000000000000000"),
+ /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+ "6512ff5c6a5fc0000000000000000000"),
+ /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+ "ca25feb8d4bf80000000000000000000"),
+ /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+ "944bfd71a97f00000000000000000000"),
+ /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+ "2897fae352fe00000000000000000000"),
+ /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+ "512ff5c6a5fc00000000000000000000"),
+ /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+ "a25feb8d4bf800000000000000000000"),
+ /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+ "44bfd71a97f000000000000000000000"),
+ /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+ "897fae352fe000000000000000000000"),
+ /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+ "12ff5c6a5fc000000000000000000000"),
+ /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+ "25feb8d4bf8000000000000000000000"),
+ /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+ "4bfd71a97f0000000000000000000000"),
+ /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+ "97fae352fe0000000000000000000000"),
+ /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+ "2ff5c6a5fc0000000000000000000000"),
+ /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+ "5feb8d4bf80000000000000000000000"),
+ /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+ "bfd71a97f00000000000000000000000"),
+ /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+ "7fae352fe00000000000000000000000"),
+ /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+ "ff5c6a5fc00000000000000000000000"),
+ /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+ "feb8d4bf800000000000000000000000"),
+ /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+ "fd71a97f000000000000000000000000"),
+ /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+ "fae352fe000000000000000000000000"),
+ /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+ "f5c6a5fc000000000000000000000000"),
+ /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+ "eb8d4bf8000000000000000000000000"),
+ /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+ "d71a97f0000000000000000000000000"),
+ /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+ "ae352fe0000000000000000000000000"),
+ /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+ "5c6a5fc0000000000000000000000000"),
+ /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+ "b8d4bf80000000000000000000000000"),
+ /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+ "71a97f00000000000000000000000000"),
+ /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+ "e352fe00000000000000000000000000"),
+ /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+ "c6a5fc00000000000000000000000000"),
+ /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+ "8d4bf800000000000000000000000000"),
+ /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+ "1a97f000000000000000000000000000"),
+ /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+ "352fe000000000000000000000000000"),
+ /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+ "6a5fc000000000000000000000000000"),
+ /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+ "d4bf8000000000000000000000000000"),
+ /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+ "a97f0000000000000000000000000000"),
+ /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+ "52fe0000000000000000000000000000"),
+ /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+ "a5fc0000000000000000000000000000"),
+ /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+ "4bf80000000000000000000000000000"),
+ /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+ "97f00000000000000000000000000000"),
+ /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+ "2fe00000000000000000000000000000"),
+ /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+ "5fc00000000000000000000000000000"),
+ /* [119] */ new RandomConstant("de000000000000000000000000000001",
+ "bf800000000000000000000000000000"),
+ /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+ "7f000000000000000000000000000000"),
+ /* [121] */ new RandomConstant("78000000000000000000000000000001",
+ "fe000000000000000000000000000000"),
+ /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+ "fc000000000000000000000000000000"),
+ /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+ "f8000000000000000000000000000000"),
+ /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+ "f0000000000000000000000000000000"),
+ /* [125] */ new RandomConstant("80000000000000000000000000000001",
+ "e0000000000000000000000000000000"),
+ /* [126] */ new RandomConstant("00000000000000000000000000000001",
+ "c0000000000000000000000000000000"),
+ /* [127] */ new RandomConstant("00000000000000000000000000000001",
+ "80000000000000000000000000000000")};
+
+ /**
+ * generate the random number that is "advance" steps
+ * from an initial random number of 0. This is done by
+ * starting with 0, and then advancing the by the
+ * appropriate powers of 2 of the linear congruential
+ * generator.
+ */
+ public static Unsigned16 skipAhead(Unsigned16 advance) {
+ Unsigned16 result = new Unsigned16();
+ long bit_map;
+
+ bit_map = advance.getLow8();
+ for (int i = 0; bit_map != 0 && i < 64; i++) {
+ if ((bit_map & (1L << i)) != 0) {
+ /* advance random number by f**(2**i) (x)
+ */
+ result.multiply(genArray[i].a);
+ result.add(genArray[i].c);
+ bit_map &= ~(1L << i);
+ }
+ }
+ bit_map = advance.getHigh8();
+ for (int i = 0; bit_map != 0 && i < 64; i++)
+ {
+ if ((bit_map & (1L << i)) != 0) {
+ /* advance random number by f**(2**(i + 64)) (x)
+ */
+ result.multiply(genArray[i+64].a);
+ result.add(genArray[i+64].c);
+ bit_map &= ~(1L << i);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Generate the next 16 byte random number.
+ */
+ public static void nextRand(Unsigned16 rand) {
+ /* advance the random number forward once using the linear congruential
+ * generator, and then return the new random number
+ */
+ rand.multiply(genArray[0].a);
+ rand.add(genArray[0].c);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TeraChecksum extends Configured implements Tool {
+ static class ChecksumMapper
+ extends Mapper<Text, Text, NullWritable, Unsigned16> {
+ private Unsigned16 checksum = new Unsigned16();
+ private Unsigned16 sum = new Unsigned16();
+ private Checksum crc32 = new PureJavaCrc32();
+
+ public void map(Text key, Text value,
+ Context context) throws IOException {
+ crc32.reset();
+ crc32.update(key.getBytes(), 0, key.getLength());
+ crc32.update(value.getBytes(), 0, value.getLength());
+ checksum.set(crc32.getValue());
+ sum.add(checksum);
+ }
+
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ context.write(NullWritable.get(), sum);
+ }
+ }
+
+ static class ChecksumReducer
+ extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
+
+ public void reduce(NullWritable key, Iterable<Unsigned16> values,
+ Context context) throws IOException, InterruptedException {
+ Unsigned16 sum = new Unsigned16();
+ for (Unsigned16 val : values) {
+ sum.add(val);
+ }
+ context.write(key, sum);
+ }
+ }
+
+ private static void usage() throws IOException {
+ System.err.println("terasum <out-dir> <report-dir>");
+ }
+
+ public int run(String[] args) throws Exception {
+ Job job = Job.getInstance(getConf());
+ if (args.length != 2) {
+ usage();
+ return 2;
+ }
+ TeraInputFormat.setInputPaths(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setJobName("TeraSum");
+ job.setJarByClass(TeraChecksum.class);
+ job.setMapperClass(ChecksumMapper.class);
+ job.setReducerClass(ChecksumReducer.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Unsigned16.class);
+ // force a single reducer
+ job.setNumReduceTasks(1);
+ job.setInputFormatClass(TeraInputFormat.class);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
+ System.exit(res);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate the official GraySort input data set.
+ * The user specifies the number of rows and the output directory and this
+ * class runs a map/reduce program to generate the data.
+ * The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid)
+ * (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
+ * <li>The rowid is the right justified row id as a hex number.
+ * </ul>
+ *
+ * <p>
+ * To run the program:
+ * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
+ */
+public class TeraGen extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(TeraSort.class);
+
+ public static enum Counters {CHECKSUM}
+
+ public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
+ /**
+ * An input format that assigns ranges of longs to each mapper.
+ */
+ static class RangeInputFormat
+ extends InputFormat<LongWritable, NullWritable> {
+
+ /**
+ * An input split consisting of a range on numbers.
+ */
+ static class RangeInputSplit extends InputSplit implements Writable {
+ long firstRow;
+ long rowCount;
+
+ public RangeInputSplit() { }
+
+ public RangeInputSplit(long offset, long length) {
+ firstRow = offset;
+ rowCount = length;
+ }
+
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ public String[] getLocations() throws IOException {
+ return new String[]{};
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ firstRow = WritableUtils.readVLong(in);
+ rowCount = WritableUtils.readVLong(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, firstRow);
+ WritableUtils.writeVLong(out, rowCount);
+ }
+ }
+
+ /**
+ * A record reader that will generate a range of numbers.
+ */
+ static class RangeRecordReader
+ extends RecordReader<LongWritable, NullWritable> {
+ long startRow;
+ long finishedRows;
+ long totalRows;
+ LongWritable key = null;
+
+ public RangeRecordReader() {
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ startRow = ((RangeInputSplit)split).firstRow;
+ finishedRows = 0;
+ totalRows = ((RangeInputSplit)split).rowCount;
+ }
+
+ public void close() throws IOException {
+ // NOTHING
+ }
+
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ public float getProgress() throws IOException {
+ return finishedRows / (float) totalRows;
+ }
+
+ public boolean nextKeyValue() {
+ if (key == null) {
+ key = new LongWritable();
+ }
+ if (finishedRows < totalRows) {
+ key.set(startRow + finishedRows);
+ finishedRows += 1;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ }
+
+ public RecordReader<LongWritable, NullWritable>
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new RangeRecordReader();
+ }
+
+ /**
+ * Create the desired number of splits, dividing the number of rows
+ * between the mappers.
+ */
+ public List<InputSplit> getSplits(JobContext job) {
+ long totalRows = getNumberOfRows(job);
+ int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+ LOG.info("Generating " + totalRows + " using " + numSplits);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ long currentRow = 0;
+ for(int split = 0; split < numSplits; ++split) {
+ long goal =
+ (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
+ splits.add(new RangeInputSplit(currentRow, goal - currentRow));
+ currentRow = goal;
+ }
+ return splits;
+ }
+
+ }
+
+ static long getNumberOfRows(JobContext job) {
+ return job.getConfiguration().getLong(NUM_ROWS, 0);
+ }
+
+ static void setNumberOfRows(Job job, long numRows) {
+ job.getConfiguration().setLong(NUM_ROWS, numRows);
+ }
+
+ /**
+ * The Mapper class that given a row number, will generate the appropriate
+ * output line.
+ */
+ public static class SortGenMapper
+ extends Mapper<LongWritable, NullWritable, Text, Text> {
+
+ private Text key = new Text();
+ private Text value = new Text();
+ private Unsigned16 rand = null;
+ private Unsigned16 rowId = null;
+ private Unsigned16 checksum = new Unsigned16();
+ private Checksum crc32 = new PureJavaCrc32();
+ private Unsigned16 total = new Unsigned16();
+ private static final Unsigned16 ONE = new Unsigned16(1);
+ private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
+ TeraInputFormat.VALUE_LENGTH];
+ private Counter checksumCounter;
+
+ public void map(LongWritable row, NullWritable ignored,
+ Context context) throws IOException, InterruptedException {
+ if (rand == null) {
+ rowId = new Unsigned16(row.get());
+ rand = Random16.skipAhead(rowId);
+ checksumCounter = context.getCounter(Counters.CHECKSUM);
+ }
+ Random16.nextRand(rand);
+ GenSort.generateRecord(buffer, rand, rowId);
+ key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
+ value.set(buffer, TeraInputFormat.KEY_LENGTH,
+ TeraInputFormat.VALUE_LENGTH);
+ context.write(key, value);
+ crc32.reset();
+ crc32.update(buffer, 0,
+ TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
+ checksum.set(crc32.getValue());
+ total.add(checksum);
+ rowId.add(ONE);
+ }
+
+ @Override
+ public void cleanup(Context context) {
+ if (checksumCounter != null) {
+ checksumCounter.increment(total.getLow8());
+ }
+ }
+ }
+
+ private static void usage() throws IOException {
+ System.err.println("teragen <num rows> <output dir>");
+ }
+
+ /**
+ * Parse a number that optionally has a postfix that denotes a base.
+ * @param str an string integer with an option base {k,m,b,t}.
+ * @return the expanded value
+ */
+ private static long parseHumanLong(String str) {
+ char tail = str.charAt(str.length() - 1);
+ long base = 1;
+ switch (tail) {
+ case 't':
+ base *= 1000 * 1000 * 1000 * 1000;
+ break;
+ case 'b':
+ base *= 1000 * 1000 * 1000;
+ break;
+ case 'm':
+ base *= 1000 * 1000;
+ break;
+ case 'k':
+ base *= 1000;
+ break;
+ default:
+ }
+ if (base != 1) {
+ str = str.substring(0, str.length() - 1);
+ }
+ return Long.parseLong(str) * base;
+ }
+
+ /**
+ * @param args the cli arguments
+ */
+ public int run(String[] args)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = Job.getInstance(getConf());
+ if (args.length != 2) {
+ usage();
+ return 2;
+ }
+ setNumberOfRows(job, parseHumanLong(args[0]));
+ Path outputDir = new Path(args[1]);
+ if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
+ throw new IOException("Output directory " + outputDir +
+ " already exists.");
+ }
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setJobName("TeraGen");
+ job.setJarByClass(TeraGen.class);
+ job.setMapperClass(SortGenMapper.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setInputFormatClass(RangeInputFormat.class);
+ job.setOutputFormatClass(TeraOutputFormat.class);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
+ System.exit(res);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraGen.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * An input format that reads the first 10 characters of each line as the key
+ * and the rest of the line as the value. Both key and value are represented
+ * as Text.
+ */
+public class TeraInputFormat extends FileInputFormat<Text,Text> {
+
+ static final String PARTITION_FILENAME = "_partition.lst";
+ private static final String NUM_PARTITIONS =
+ "mapreduce.terasort.num.partitions";
+ private static final String SAMPLE_SIZE =
+ "mapreduce.terasort.partitions.sample";
+ static final int KEY_LENGTH = 10;
+ static final int VALUE_LENGTH = 90;
+ static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+ private static MRJobConfig lastContext = null;
+ private static List<InputSplit> lastResult = null;
+
+ static class TeraFileSplit extends FileSplit {
+ static private String[] ZERO_LOCATIONS = new String[0];
+
+ private String[] locations;
+
+ public TeraFileSplit() {
+ locations = ZERO_LOCATIONS;
+ }
+ public TeraFileSplit(Path file, long start, long length, String[] hosts) {
+ super(file, start, length, hosts);
+ try {
+ locations = super.getLocations();
+ } catch (IOException e) {
+ locations = ZERO_LOCATIONS;
+ }
+ }
+
+ // XXXXXX should this also be null-protected?
+ protected void setLocations(String[] hosts) {
+ locations = hosts;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+
+ public String toString() {
+ StringBuffer result = new StringBuffer();
+ result.append(getPath());
+ result.append(" from ");
+ result.append(getStart());
+ result.append(" length ");
+ result.append(getLength());
+ for(String host: getLocations()) {
+ result.append(" ");
+ result.append(host);
+ }
+ return result.toString();
+ }
+ }
+
+ static class TextSampler implements IndexedSortable {
+ private ArrayList<Text> records = new ArrayList<Text>();
+
+ public int compare(int i, int j) {
+ Text left = records.get(i);
+ Text right = records.get(j);
+ return left.compareTo(right);
+ }
+
+ public void swap(int i, int j) {
+ Text left = records.get(i);
+ Text right = records.get(j);
+ records.set(j, left);
+ records.set(i, right);
+ }
+
+ public void addKey(Text key) {
+ synchronized (this) {
+ records.add(new Text(key));
+ }
+ }
+
+ /**
+ * Find the split points for a given sample. The sample keys are sorted
+ * and down sampled to find even split points for the partitions. The
+ * returned keys should be the start of their respective partitions.
+ * @param numPartitions the desired number of partitions
+ * @return an array of size numPartitions - 1 that holds the split points
+ */
+ Text[] createPartitions(int numPartitions) {
+ int numRecords = records.size();
+ System.out.println("Making " + numPartitions + " from " + numRecords +
+ " sampled records");
+ if (numPartitions > numRecords) {
+ throw new IllegalArgumentException
+ ("Requested more partitions than input keys (" + numPartitions +
+ " > " + numRecords + ")");
+ }
+ new QuickSort().sort(this, 0, records.size());
+ float stepSize = numRecords / (float) numPartitions;
+ Text[] result = new Text[numPartitions-1];
+ for(int i=1; i < numPartitions; ++i) {
+ result[i-1] = records.get(Math.round(stepSize * i));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Use the input splits to take samples of the input and generate sample
+ * keys. By default reads 100,000 keys from 10 locations in the input, sorts
+ * them and picks N-1 keys to generate N equally sized partitions.
+ * @param job the job to sample
+ * @param partFile where to write the output file to
+ * @throws Throwable if something goes wrong
+ */
+ public static void writePartitionFile(final JobContext job,
+ Path partFile) throws Throwable {
+ long t1 = System.currentTimeMillis();
+ Configuration conf = job.getConfiguration();
+ final TeraInputFormat inFormat = new TeraInputFormat();
+ final TextSampler sampler = new TextSampler();
+ int partitions = job.getNumReduceTasks();
+ long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
+ final List<InputSplit> splits = inFormat.getSplits(job);
+ long t2 = System.currentTimeMillis();
+ System.out.println("Computing input splits took " + (t2 - t1) + "ms");
+ int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
+ System.out.println("Sampling " + samples + " splits of " + splits.size());
+ final long recordsPerSample = sampleSize / samples;
+ final int sampleStep = splits.size() / samples;
+ Thread[] samplerReader = new Thread[samples];
+ SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
+ // take N samples from different parts of the input
+ for(int i=0; i < samples; ++i) {
+ final int idx = i;
+ samplerReader[i] =
+ new Thread (threadGroup,"Sampler Reader " + idx) {
+ {
+ setDaemon(true);
+ }
+ public void run() {
+ long records = 0;
+ try {
+ TaskAttemptContext context = new TaskAttemptContextImpl(
+ job.getConfiguration(), new TaskAttemptID());
+ RecordReader<Text, Text> reader =
+ inFormat.createRecordReader(splits.get(sampleStep * idx),
+ context);
+ reader.initialize(splits.get(sampleStep * idx), context);
+ while (reader.nextKeyValue()) {
+ sampler.addKey(new Text(reader.getCurrentKey()));
+ records += 1;
+ if (recordsPerSample <= records) {
+ break;
+ }
+ }
+ } catch (IOException ie){
+ System.err.println("Got an exception while reading splits " +
+ StringUtils.stringifyException(ie));
+ throw new RuntimeException(ie);
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ samplerReader[i].start();
+ }
+ FileSystem outFs = partFile.getFileSystem(conf);
+ DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
+ outFs.getDefaultBlockSize(partFile));
+ for (int i = 0; i < samples; i++) {
+ try {
+ samplerReader[i].join();
+ if(threadGroup.getThrowable() != null){
+ throw threadGroup.getThrowable();
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ for(Text split : sampler.createPartitions(partitions)) {
+ split.write(writer);
+ }
+ writer.close();
+ long t3 = System.currentTimeMillis();
+ System.out.println("Computing parititions took " + (t3 - t2) + "ms");
+ }
+
+ static class SamplerThreadGroup extends ThreadGroup{
+
+ private Throwable throwable;
+
+ public SamplerThreadGroup(String s) {
+ super(s);
+ }
+
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ this.throwable = throwable;
+ }
+
+ public Throwable getThrowable() {
+ return this.throwable;
+ }
+
+ }
+
+ static class TeraRecordReader extends RecordReader<Text,Text> {
+ private FSDataInputStream in;
+ private long offset;
+ private long length;
+ private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+ private byte[] buffer = new byte[RECORD_LENGTH];
+ private Text key;
+ private Text value;
+
+ public TeraRecordReader() throws IOException {
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Path p = ((FileSplit)split).getPath();
+ FileSystem fs = p.getFileSystem(context.getConfiguration());
+ in = fs.open(p);
+ long start = ((FileSplit)split).getStart();
+ // find the offset to start at a record boundary
+ offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
+ in.seek(start + offset);
+ length = ((FileSplit)split).getLength();
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public Text getCurrentKey() {
+ return key;
+ }
+
+ public Text getCurrentValue() {
+ return value;
+ }
+
+ public float getProgress() throws IOException {
+ return (float) offset / length;
+ }
+
+ public boolean nextKeyValue() throws IOException {
+ if (offset >= length) {
+ return false;
+ }
+ int read = 0;
+ while (read < RECORD_LENGTH) {
+ long newRead = in.read(buffer, read, RECORD_LENGTH - read);
+ if (newRead == -1) {
+ if (read == 0) {
+ return false;
+ } else {
+ throw new EOFException("read past eof");
+ }
+ }
+ read += newRead;
+ }
+ if (key == null) {
+ key = new Text();
+ }
+ if (value == null) {
+ value = new Text();
+ }
+ key.set(buffer, 0, KEY_LENGTH);
+ value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
+ offset += RECORD_LENGTH;
+ return true;
+ }
+ }
+
+ @Override
+ public RecordReader<Text, Text>
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new TeraRecordReader();
+ }
+
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts) {
+ return new TeraFileSplit(file, start, length, hosts);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ if (job == lastContext) {
+ return lastResult;
+ }
+ long t1, t2, t3;
+ t1 = System.currentTimeMillis();
+ lastContext = job;
+ lastResult = super.getSplits(job);
+ t2 = System.currentTimeMillis();
+ System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
+ if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
+ TeraScheduler scheduler = new TeraScheduler(
+ lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
+ lastResult = scheduler.getNewFileSplits();
+ t3 = System.currentTimeMillis();
+ System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
+ }
+ return lastResult;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * An output format that writes the key and value appended together.
+ */
+public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
+ static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
+ private OutputCommitter committer = null;
+
+ /**
+ * Set the requirement for a final sync before the stream is closed.
+ */
+ static void setFinalSync(JobContext job, boolean newValue) {
+ job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+ }
+
+ /**
+ * Does the user want a final sync at close?
+ */
+ public static boolean getFinalSync(JobContext job) {
+ return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+ }
+
+ static class TeraRecordWriter extends RecordWriter<Text,Text> {
+ private boolean finalSync = false;
+ private FSDataOutputStream out;
+
+ public TeraRecordWriter(FSDataOutputStream out,
+ JobContext job) {
+ finalSync = getFinalSync(job);
+ this.out = out;
+ }
+
+ public synchronized void write(Text key,
+ Text value) throws IOException {
+ out.write(key.getBytes(), 0, key.getLength());
+ out.write(value.getBytes(), 0, value.getLength());
+ }
+
+ public void close(TaskAttemptContext context) throws IOException {
+ if (finalSync) {
+ out.hsync();
+ }
+ out.close();
+ }
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext job
+ ) throws InvalidJobConfException, IOException {
+ // Ensure that the output directory is set
+ Path outDir = getOutputPath(job);
+ if (outDir == null) {
+ throw new InvalidJobConfException("Output directory not set in JobConf.");
+ }
+
+ // get delegation token for outDir's file system
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { outDir }, job.getConfiguration());
+ }
+
+ public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
+ ) throws IOException {
+ Path file = getDefaultWorkFile(job, "");
+ FileSystem fs = file.getFileSystem(job.getConfiguration());
+ FSDataOutputStream fileOut = fs.create(file);
+ return new TeraRecordWriter(fileOut, job);
+ }
+
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException {
+ if (committer == null) {
+ Path output = getOutputPath(context);
+ committer = new FileOutputCommitter(output, context);
+ }
+ return committer;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraOutputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native