You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/21 16:36:14 UTC
svn commit: r882929 [6/7] - in
/hadoop/pig/branches/branch-0.6/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/schema/
src/java/org/apache/hadoop/zebra/tfile/ src/java/org...
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeqFileComparison.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeqFileComparison.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeqFileComparison.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeqFileComparison.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner.Entry;
+
+public class TestTFileSeqFileComparison extends TestCase {
+ MyOptions options;
+
+ private FileSystem fs;
+ private Configuration conf;
+ private long startTimeEpoch;
+ private long finishTimeEpoch;
+ private DateFormat formatter;
+ byte[][] dictionary;
+
+ @Override
+ public void setUp() throws IOException {
+ if (options == null) {
+ options = new MyOptions(new String[0]);
+ }
+
+ conf = new Configuration();
+ conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
+ conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
+ Path path = new Path(options.rootDir);
+ fs = path.getFileSystem(conf);
+ formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ setUpDictionary();
+ }
+
+ private void setUpDictionary() {
+ Random rng = new Random();
+ dictionary = new byte[options.dictSize][];
+ for (int i = 0; i < options.dictSize; ++i) {
+ int len =
+ rng.nextInt(options.maxWordLen - options.minWordLen)
+ + options.minWordLen;
+ dictionary[i] = new byte[len];
+ rng.nextBytes(dictionary[i]);
+ }
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ // do nothing
+ }
+
+ public void startTime() throws IOException {
+ startTimeEpoch = System.currentTimeMillis();
+ System.out.println(formatTime() + " Started timing.");
+ }
+
+ public void stopTime() throws IOException {
+ finishTimeEpoch = System.currentTimeMillis();
+ System.out.println(formatTime() + " Stopped timing.");
+ }
+
+ public long getIntervalMillis() throws IOException {
+ return finishTimeEpoch - startTimeEpoch;
+ }
+
+ public void printlnWithTimestamp(String message) throws IOException {
+ System.out.println(formatTime() + " " + message);
+ }
+
+ /*
+ * Format millis into minutes and seconds.
+ */
+ public String formatTime(long milis) {
+ return formatter.format(milis);
+ }
+
+ public String formatTime() {
+ return formatTime(System.currentTimeMillis());
+ }
+
+ private interface KVAppendable {
+ public void append(BytesWritable key, BytesWritable value)
+ throws IOException;
+
+ public void close() throws IOException;
+ }
+
+ private interface KVReadable {
+ public byte[] getKey();
+
+ public byte[] getValue();
+
+ public int getKeyLength();
+
+ public int getValueLength();
+
+ public boolean next() throws IOException;
+
+ public void close() throws IOException;
+ }
+
+ static class TFileAppendable implements KVAppendable {
+ private FSDataOutputStream fsdos;
+ private TFile.Writer writer;
+
+ public TFileAppendable(FileSystem fs, Path path, String compress,
+ int minBlkSize, int osBufferSize, Configuration conf)
+ throws IOException {
+ this.fsdos = fs.create(path, true, osBufferSize);
+ this.writer = new TFile.Writer(fsdos, minBlkSize, compress, null, conf);
+ }
+
+ public void append(BytesWritable key, BytesWritable value)
+ throws IOException {
+ writer.append(key.get(), 0, key.getSize(), value.get(), 0, value
+ .getSize());
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ fsdos.close();
+ }
+ }
+
+ static class TFileReadable implements KVReadable {
+ private FSDataInputStream fsdis;
+ private TFile.Reader reader;
+ private TFile.Reader.Scanner scanner;
+ private byte[] keyBuffer;
+ private int keyLength;
+ private byte[] valueBuffer;
+ private int valueLength;
+
+ public TFileReadable(FileSystem fs, Path path, int osBufferSize,
+ Configuration conf) throws IOException {
+ this.fsdis = fs.open(path, osBufferSize);
+ this.reader =
+ new TFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
+ this.scanner = reader.createScanner();
+ keyBuffer = new byte[32];
+ valueBuffer = new byte[32];
+ }
+
+ private void checkKeyBuffer(int size) {
+ if (size <= keyBuffer.length) {
+ return;
+ }
+ keyBuffer =
+ new byte[Math.max(2 * keyBuffer.length, 2 * size - keyBuffer.length)];
+ }
+
+ private void checkValueBuffer(int size) {
+ if (size <= valueBuffer.length) {
+ return;
+ }
+ valueBuffer =
+ new byte[Math.max(2 * valueBuffer.length, 2 * size
+ - valueBuffer.length)];
+ }
+
+ public byte[] getKey() {
+ return keyBuffer;
+ }
+
+ public int getKeyLength() {
+ return keyLength;
+ }
+
+ public byte[] getValue() {
+ return valueBuffer;
+ }
+
+ public int getValueLength() {
+ return valueLength;
+ }
+
+ public boolean next() throws IOException {
+ if (scanner.atEnd()) return false;
+ Entry entry = scanner.entry();
+ keyLength = entry.getKeyLength();
+ checkKeyBuffer(keyLength);
+ entry.getKey(keyBuffer);
+ valueLength = entry.getValueLength();
+ checkValueBuffer(valueLength);
+ entry.getValue(valueBuffer);
+ scanner.advance();
+ return true;
+ }
+
+ public void close() throws IOException {
+ scanner.close();
+ reader.close();
+ fsdis.close();
+ }
+ }
+
+ static class SeqFileAppendable implements KVAppendable {
+ private FSDataOutputStream fsdos;
+ private SequenceFile.Writer writer;
+
+ public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
+ String compress, int minBlkSize) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("hadoop.native.lib", true);
+
+ CompressionCodec codec = null;
+ if ("lzo".equals(compress)) {
+ codec = Compression.Algorithm.LZO.getCodec();
+ }
+ else if ("gz".equals(compress)) {
+ codec = Compression.Algorithm.GZ.getCodec();
+ }
+ else if (!"none".equals(compress))
+ throw new IOException("Codec not supported.");
+
+ this.fsdos = fs.create(path, true, osBufferSize);
+
+ if (!"none".equals(compress)) {
+ writer =
+ SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
+ BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
+ }
+ else {
+ writer =
+ SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
+ BytesWritable.class, SequenceFile.CompressionType.NONE, null);
+ }
+ }
+
+ public void append(BytesWritable key, BytesWritable value)
+ throws IOException {
+ writer.append(key, value);
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ fsdos.close();
+ }
+ }
+
+ static class SeqFileReadable implements KVReadable {
+ private SequenceFile.Reader reader;
+ private BytesWritable key;
+ private BytesWritable value;
+
+ public SeqFileReadable(FileSystem fs, Path path, int osBufferSize)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt("io.file.buffer.size", osBufferSize);
+ reader = new SequenceFile.Reader(fs, path, conf);
+ key = new BytesWritable();
+ value = new BytesWritable();
+ }
+
+ public byte[] getKey() {
+ return key.get();
+ }
+
+ public int getKeyLength() {
+ return key.getSize();
+ }
+
+ public byte[] getValue() {
+ return value.get();
+ }
+
+ public int getValueLength() {
+ return value.getSize();
+ }
+
+ public boolean next() throws IOException {
+ return reader.next(key, value);
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+
+ private void reportStats(Path path, long totalBytes) throws IOException {
+ long duration = getIntervalMillis();
+ long fsize = fs.getFileStatus(path).getLen();
+ printlnWithTimestamp(String.format(
+ "Duration: %dms...total size: %.2fMB...raw thrpt: %.2fMB/s", duration,
+ (double) totalBytes / 1024 / 1024, (double) totalBytes / duration
+ * 1000 / 1024 / 1024));
+ printlnWithTimestamp(String.format(
+ "Compressed size: %.2fMB...compressed thrpt: %.2fMB/s.",
+ (double) fsize / 1024 / 1024, (double) fsize / duration * 1000 / 1024
+ / 1024));
+ }
+
+ private void fillBuffer(Random rng, BytesWritable bw, byte[] tmp, int len) {
+ int n = 0;
+ while (n < len) {
+ byte[] word = dictionary[rng.nextInt(dictionary.length)];
+ int l = Math.min(word.length, len - n);
+ System.arraycopy(word, 0, tmp, n, l);
+ n += l;
+ }
+ bw.set(tmp, 0, len);
+ }
+
+ private void timeWrite(Path path, KVAppendable appendable, int baseKlen,
+ int baseVlen, long fileSize) throws IOException {
+ int maxKlen = baseKlen * 2;
+ int maxVlen = baseVlen * 2;
+ BytesWritable key = new BytesWritable();
+ BytesWritable value = new BytesWritable();
+ byte[] keyBuffer = new byte[maxKlen];
+ byte[] valueBuffer = new byte[maxVlen];
+ Random rng = new Random(options.seed);
+ long totalBytes = 0;
+ printlnWithTimestamp("Start writing: " + path.getName() + "...");
+ startTime();
+
+ for (long i = 0; true; ++i) {
+ if (i % 1000 == 0) { // test the size for every 1000 rows.
+ if (fs.getFileStatus(path).getLen() >= fileSize) {
+ break;
+ }
+ }
+ int klen = rng.nextInt(baseKlen) + baseKlen;
+ int vlen = rng.nextInt(baseVlen) + baseVlen;
+ fillBuffer(rng, key, keyBuffer, klen);
+ fillBuffer(rng, value, valueBuffer, vlen);
+ key.set(keyBuffer, 0, klen);
+ value.set(valueBuffer, 0, vlen);
+ appendable.append(key, value);
+ totalBytes += klen;
+ totalBytes += vlen;
+ }
+ stopTime();
+ appendable.close();
+ reportStats(path, totalBytes);
+ }
+
+ private void timeRead(Path path, KVReadable readable) throws IOException {
+ printlnWithTimestamp("Start reading: " + path.getName() + "...");
+ long totalBytes = 0;
+ startTime();
+ for (; readable.next();) {
+ totalBytes += readable.getKeyLength();
+ totalBytes += readable.getValueLength();
+ }
+ stopTime();
+ readable.close();
+ reportStats(path, totalBytes);
+ }
+
+ private void createTFile(String parameters, String compress)
+ throws IOException {
+ System.out.println("=== TFile: Creation (" + parameters + ") === ");
+ Path path = new Path(options.rootDir, "TFile.Performance");
+ KVAppendable appendable =
+ new TFileAppendable(fs, path, compress, options.minBlockSize,
+ options.osOutputBufferSize, conf);
+ timeWrite(path, appendable, options.keyLength, options.valueLength,
+ options.fileSize);
+ }
+
+ private void readTFile(String parameters, boolean delFile) throws IOException {
+ System.out.println("=== TFile: Reading (" + parameters + ") === ");
+ {
+ Path path = new Path(options.rootDir, "TFile.Performance");
+ KVReadable readable =
+ new TFileReadable(fs, path, options.osInputBufferSize, conf);
+ timeRead(path, readable);
+ if (delFile) {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+ }
+ }
+
+ private void createSeqFile(String parameters, String compress)
+ throws IOException {
+ System.out.println("=== SeqFile: Creation (" + parameters + ") === ");
+ Path path = new Path(options.rootDir, "SeqFile.Performance");
+ KVAppendable appendable =
+ new SeqFileAppendable(fs, path, options.osOutputBufferSize, compress,
+ options.minBlockSize);
+ timeWrite(path, appendable, options.keyLength, options.valueLength,
+ options.fileSize);
+ }
+
+ private void readSeqFile(String parameters, boolean delFile)
+ throws IOException {
+ System.out.println("=== SeqFile: Reading (" + parameters + ") === ");
+ Path path = new Path(options.rootDir, "SeqFile.Performance");
+ KVReadable readable =
+ new SeqFileReadable(fs, path, options.osInputBufferSize);
+ timeRead(path, readable);
+ if (delFile) {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+ }
+
+ private void compareRun(String compress) throws IOException {
+ String[] supported = TFile.getSupportedCompressionAlgorithms();
+ boolean proceed = false;
+ for (String c : supported) {
+ if (c.equals(compress)) {
+ proceed = true;
+ break;
+ }
+ }
+
+ if (!proceed) {
+ System.out.println("Skipped for " + compress);
+ return;
+ }
+
+ options.compress = compress;
+ String parameters = parameters2String(options);
+ createSeqFile(parameters, compress);
+ readSeqFile(parameters, true);
+ createTFile(parameters, compress);
+ readTFile(parameters, true);
+ createTFile(parameters, compress);
+ readTFile(parameters, true);
+ createSeqFile(parameters, compress);
+ readSeqFile(parameters, true);
+ }
+
+ public void testRunComparisons() throws IOException {
+ String[] compresses = new String[] { "none", "lzo", "gz" };
+ for (String compress : compresses) {
+ if (compress.equals("none")) {
+ conf
+ .setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeNone);
+ conf.setInt("tfile.fs.output.buffer.size",
+ options.fsOutputBufferSizeNone);
+ }
+ else if (compress.equals("lzo")) {
+ conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeLzo);
+ conf.setInt("tfile.fs.output.buffer.size",
+ options.fsOutputBufferSizeLzo);
+ }
+ else {
+ conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeGz);
+ conf
+ .setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSizeGz);
+ }
+ compareRun(compress);
+ }
+ }
+
+ private static String parameters2String(MyOptions options) {
+ return String
+ .format(
+ "KLEN: %d-%d... VLEN: %d-%d...MinBlkSize: %.2fKB...Target Size: %.2fMB...Compression: ...%s",
+ options.keyLength, options.keyLength * 2, options.valueLength,
+ options.valueLength * 2, (double) options.minBlockSize / 1024,
+ (double) options.fileSize / 1024 / 1024, options.compress);
+ }
+
+ private static class MyOptions {
+ String rootDir =
+ System
+ .getProperty("test.build.data", "/tmp/tfile-test");
+ String compress = "gz";
+ String format = "tfile";
+ int dictSize = 1000;
+ int minWordLen = 5;
+ int maxWordLen = 20;
+ int keyLength = 50;
+ int valueLength = 100;
+ int minBlockSize = 256 * 1024;
+ int fsOutputBufferSize = 1;
+ int fsInputBufferSize = 0;
+ // special variable only for unit testing.
+ int fsInputBufferSizeNone = 0;
+ int fsInputBufferSizeGz = 0;
+ int fsInputBufferSizeLzo = 0;
+ int fsOutputBufferSizeNone = 1;
+ int fsOutputBufferSizeGz = 1;
+ int fsOutputBufferSizeLzo = 1;
+
+ // un-exposed parameters.
+ int osInputBufferSize = 64 * 1024;
+ int osOutputBufferSize = 64 * 1024;
+
+ long fileSize = 3 * 1024 * 1024;
+ long seed;
+
+ static final int OP_CREATE = 1;
+ static final int OP_READ = 2;
+ int op = OP_READ;
+
+ boolean proceed = false;
+
+ public MyOptions(String[] args) {
+ seed = System.nanoTime();
+
+ try {
+ Options opts = buildOptions();
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = parser.parse(opts, args, true);
+ processOptions(line, opts);
+ validateOptions();
+ }
+ catch (ParseException e) {
+ System.out.println(e.getMessage());
+ System.out.println("Try \"--help\" option for details.");
+ setStopProceed();
+ }
+ }
+
+ public boolean proceed() {
+ return proceed;
+ }
+
+ private Options buildOptions() {
+ Option compress =
+ OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
+ .hasArg().withDescription("compression scheme").create('c');
+
+ Option ditSize =
+ OptionBuilder.withLongOpt("dict").withArgName("size").hasArg()
+ .withDescription("number of dictionary entries").create('d');
+
+ Option fileSize =
+ OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
+ .hasArg().withDescription("target size of the file (in MB).")
+ .create('s');
+
+ Option format =
+ OptionBuilder.withLongOpt("format").withArgName("[tfile|seqfile]")
+ .hasArg().withDescription("choose TFile or SeqFile").create('f');
+
+ Option fsInputBufferSz =
+ OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
+ .hasArg().withDescription(
+ "size of the file system input buffer (in bytes).").create(
+ 'i');
+
+ Option fsOutputBufferSize =
+ OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
+ .hasArg().withDescription(
+ "size of the file system output buffer (in bytes).").create(
+ 'o');
+
+ Option keyLen =
+ OptionBuilder
+ .withLongOpt("key-length")
+ .withArgName("length")
+ .hasArg()
+ .withDescription(
+ "base length of the key (in bytes), actual length varies in [base, 2*base)")
+ .create('k');
+
+ Option valueLen =
+ OptionBuilder
+ .withLongOpt("value-length")
+ .withArgName("length")
+ .hasArg()
+ .withDescription(
+ "base length of the value (in bytes), actual length varies in [base, 2*base)")
+ .create('v');
+
+ Option wordLen =
+ OptionBuilder.withLongOpt("word-length").withArgName("min,max")
+ .hasArg().withDescription(
+ "range of dictionary word length (in bytes)").create('w');
+
+ Option blockSz =
+ OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
+ .withDescription("minimum block size (in KB)").create('b');
+
+ Option seed =
+ OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
+ .withDescription("specify the seed").create('S');
+
+ Option operation =
+ OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
+ .withDescription(
+ "action: read-only, create-only, read-after-create").create(
+ 'x');
+
+ Option rootDir =
+ OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
+ .withDescription(
+ "specify root directory where files will be created.")
+ .create('r');
+
+ Option help =
+ OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
+ "show this screen").create("h");
+
+ return new Options().addOption(compress).addOption(ditSize).addOption(
+ fileSize).addOption(format).addOption(fsInputBufferSz).addOption(
+ fsOutputBufferSize).addOption(keyLen).addOption(wordLen).addOption(
+ blockSz).addOption(rootDir).addOption(valueLen).addOption(operation)
+ .addOption(help);
+
+ }
+
+ private void processOptions(CommandLine line, Options opts)
+ throws ParseException {
+ // --help -h and --version -V must be processed first.
+ if (line.hasOption('h')) {
+ HelpFormatter formatter = new HelpFormatter();
+ System.out.println("TFile and SeqFile benchmark.");
+ System.out.println();
+ formatter.printHelp(100,
+ "java ... TestTFileSeqFileComparison [options]",
+ "\nSupported options:", opts, "");
+ return;
+ }
+
+ if (line.hasOption('c')) {
+ compress = line.getOptionValue('c');
+ }
+
+ if (line.hasOption('d')) {
+ dictSize = Integer.parseInt(line.getOptionValue('d'));
+ }
+
+ if (line.hasOption('s')) {
+ fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
+ }
+
+ if (line.hasOption('f')) {
+ format = line.getOptionValue('f');
+ }
+
+ if (line.hasOption('i')) {
+ fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
+ }
+
+ if (line.hasOption('o')) {
+ fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
+ }
+
+ if (line.hasOption('k')) {
+ keyLength = Integer.parseInt(line.getOptionValue('k'));
+ }
+
+ if (line.hasOption('v')) {
+ valueLength = Integer.parseInt(line.getOptionValue('v'));
+ }
+
+ if (line.hasOption('b')) {
+ minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
+ }
+
+ if (line.hasOption('r')) {
+ rootDir = line.getOptionValue('r');
+ }
+
+ if (line.hasOption('S')) {
+ seed = Long.parseLong(line.getOptionValue('S'));
+ }
+
+ if (line.hasOption('w')) {
+ String min_max = line.getOptionValue('w');
+ StringTokenizer st = new StringTokenizer(min_max, " \t,");
+ if (st.countTokens() != 2) {
+ throw new ParseException("Bad word length specification: " + min_max);
+ }
+ minWordLen = Integer.parseInt(st.nextToken());
+ maxWordLen = Integer.parseInt(st.nextToken());
+ }
+
+ if (line.hasOption('x')) {
+ String strOp = line.getOptionValue('x');
+ if (strOp.equals("r")) {
+ op = OP_READ;
+ }
+ else if (strOp.equals("w")) {
+ op = OP_CREATE;
+ }
+ else if (strOp.equals("rw")) {
+ op = OP_CREATE | OP_READ;
+ }
+ else {
+ throw new ParseException("Unknown action specifier: " + strOp);
+ }
+ }
+
+ proceed = true;
+ }
+
+ private void validateOptions() throws ParseException {
+ if (!compress.equals("none") && !compress.equals("lzo")
+ && !compress.equals("gz")) {
+ throw new ParseException("Unknown compression scheme: " + compress);
+ }
+
+ if (!format.equals("tfile") && !format.equals("seqfile")) {
+ throw new ParseException("Unknown file format: " + format);
+ }
+
+ if (minWordLen >= maxWordLen) {
+ throw new ParseException(
+ "Max word length must be greater than min word length.");
+ }
+ return;
+ }
+
+ private void setStopProceed() {
+ proceed = false;
+ }
+
+ public boolean doCreate() {
+ return (op & OP_CREATE) != 0;
+ }
+
+ public boolean doRead() {
+ return (op & OP_READ) != 0;
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ TestTFileSeqFileComparison testCase = new TestTFileSeqFileComparison();
+ MyOptions options = new MyOptions(args);
+ if (options.proceed == false) {
+ return;
+ }
+ testCase.options = options;
+ String parameters = parameters2String(options);
+
+ testCase.setUp();
+ if (testCase.options.format.equals("tfile")) {
+ if (options.doCreate()) {
+ testCase.createTFile(parameters, options.compress);
+ }
+ if (options.doRead()) {
+ testCase.readTFile(parameters, options.doCreate());
+ }
+ }
+ else {
+ if (options.doCreate()) {
+ testCase.createSeqFile(parameters, options.compress);
+ }
+ if (options.doRead()) {
+ testCase.readSeqFile(parameters, options.doCreate());
+ }
+ }
+ testCase.tearDown();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSplit.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSplit.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSplit.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+public class TestTFileSplit extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+
+ private final static int BLOCK_SIZE = 64 * 1024;
+
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+
+ private FileSystem fs;
+ private Configuration conf;
+ private Path path;
+ private Random random = new Random();
+
+ private String comparator = "memcmp";
+ private String outputFile = "TestTFileSplit";
+
+ void createFile(int count, String compress) throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile + "." + compress);
+ fs = path.getFileSystem(conf);
+ FSDataOutputStream out = fs.create(path);
+ Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf);
+
+ int nx;
+ for (nx = 0; nx < count; nx++) {
+ byte[] key = composeSortedKey(KEY, count, nx).getBytes();
+ byte[] value = (VALUE + nx).getBytes();
+ writer.append(key, value);
+ }
+ writer.close();
+ out.close();
+ }
+
+ void readFile() throws IOException {
+ long fileLength = fs.getFileStatus(path).getLen();
+ int numSplit = 10;
+ long splitSize = fileLength / numSplit + 1;
+
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ long offset = 0;
+ long rowCount = 0;
+ BytesWritable key, value;
+ for (int i = 0; i < numSplit; ++i, offset += splitSize) {
+ Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
+ int count = 0;
+ key = new BytesWritable();
+ value = new BytesWritable();
+ while (!scanner.atEnd()) {
+ scanner.entry().get(key, value);
+ ++count;
+ scanner.advance();
+ }
+ scanner.close();
+ Assert.assertTrue(count > 0);
+ rowCount += count;
+ }
+ Assert.assertEquals(rowCount, reader.getEntryCount());
+ reader.close();
+ }
+
+ /* Similar to readFile(), tests the scanner created
+ * by record numbers rather than the offsets.
+ */
+ void readRowSplits(int numSplits) throws IOException {
+
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+ long totalRecords = reader.getEntryCount();
+ for (int i=0; i<numSplits; i++) {
+ long startRec = i*totalRecords/numSplits;
+ long endRec = (i+1)*totalRecords/numSplits;
+ if (i == numSplits-1) {
+ endRec = totalRecords;
+ }
+ Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
+ int count = 0;
+ BytesWritable key = new BytesWritable();
+ BytesWritable value = new BytesWritable();
+ long x=startRec;
+ while (!scanner.atEnd()) {
+ assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+ scanner.entry().get(key, value);
+ ++count;
+ assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+ scanner.advance();
+ ++x;
+ }
+ scanner.close();
+ Assert.assertTrue(count == (endRec - startRec));
+ }
+ // make sure specifying range at the end gives zero records.
+ Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
+ Assert.assertTrue(scanner.atEnd());
+ }
+
+ static String composeSortedKey(String prefix, int total, int value) {
+ return String.format("%s%010d", prefix, value);
+ }
+
+ void checkRecNums() throws IOException {
+ long fileLen = fs.getFileStatus(path).getLen();
+ Reader reader = new Reader(fs.open(path), fileLen, conf);
+ long totalRecs = reader.getEntryCount();
+ long begin = random.nextLong() % (totalRecs / 2);
+ if (begin < 0)
+ begin += (totalRecs / 2);
+ long end = random.nextLong() % (totalRecs / 2);
+ if (end < 0)
+ end += (totalRecs / 2);
+ end += (totalRecs / 2) + 1;
+
+ assertEquals("RecNum for offset=0 should be 0", 0, reader
+ .getRecordNumNear(0));
+ for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
+ assertEquals("RecNum for offset>=fileLen should be total entries",
+ totalRecs, reader.getRecordNumNear(x));
+ }
+
+ for (long i = 0; i < 100; ++i) {
+ assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+ .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+ }
+
+ for (long i = 1; i < 100; ++i) {
+ long x = totalRecs - i;
+ assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+ .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+ }
+
+ for (long i = begin; i < end; ++i) {
+ assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+ .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+ }
+
+ for (int i = 0; i < 1000; ++i) {
+ long x = random.nextLong() % totalRecs;
+ if (x < 0) x += totalRecs;
+ assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+ .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+ }
+ }
+
+ public void testSplit() throws IOException {
+ System.out.println("testSplit");
+ createFile(100000, Compression.Algorithm.NONE.getName());
+ checkRecNums();
+ readFile();
+ readRowSplits(10);
+ fs.delete(path, true);
+ createFile(500000, Compression.Algorithm.GZ.getName());
+ checkRecNums();
+ readFile();
+ readRowSplits(83);
+ fs.delete(path, true);
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileStreams.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileStreams.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileStreams.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+/**
+ *
+ * Streaming interfaces test case class using GZ compression codec, base class
+ * of none and LZO compression classes.
+ *
+ */
+
+public class TestTFileStreams extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+
+ private final static int BLOCK_SIZE = 512;
+ private final static int K = 1024;
+ private final static int M = K * K;
+ protected boolean skip = false;
+ private FileSystem fs;
+ private Configuration conf;
+ private Path path;
+ private FSDataOutputStream out;
+ Writer writer;
+
+ private String compression = Compression.Algorithm.GZ.getName();
+ private String comparator = "memcmp";
+ private String outputFile = "TFileTestStreams";
+
+ public void init(String compression, String comparator, String outputFile) {
+ this.compression = compression;
+ this.comparator = comparator;
+ this.outputFile = outputFile;
+ }
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile);
+ fs = path.getFileSystem(conf);
+ out = fs.create(path);
+ writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ if (!skip) {
+ try {
+ closeOutput();
+ } catch (Exception e) {
+ // no-op
+ }
+ fs.delete(path, true);
+ }
+ }
+
+ public void testNoEntry() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+ TestTFileByteArrays.readRecords(fs, path, 0, conf);
+ }
+
+ public void testOneEntryKnownLength() throws IOException {
+ if (skip)
+ return;
+ writeRecords(1, true, true);
+
+ TestTFileByteArrays.readRecords(fs, path, 1, conf);
+ }
+
+ public void testOneEntryUnknownLength() throws IOException {
+ if (skip)
+ return;
+ writeRecords(1, false, false);
+
+ // TODO: will throw exception at getValueLength, it's inconsistent though;
+ // getKeyLength returns a value correctly, though initial length is -1
+ TestTFileByteArrays.readRecords(fs, path, 1, conf);
+ }
+
+ // known key length, unknown value length
+ public void testOneEntryMixedLengths1() throws IOException {
+ if (skip)
+ return;
+ writeRecords(1, true, false);
+
+ TestTFileByteArrays.readRecords(fs, path, 1, conf);
+ }
+
+ // unknown key length, known value length
+ public void testOneEntryMixedLengths2() throws IOException {
+ if (skip)
+ return;
+ writeRecords(1, false, true);
+
+ TestTFileByteArrays.readRecords(fs, path, 1, conf);
+ }
+
+ public void testTwoEntriesKnownLength() throws IOException {
+ if (skip)
+ return;
+ writeRecords(2, true, true);
+
+ TestTFileByteArrays.readRecords(fs, path, 2, conf);
+ }
+
+ // Negative test
+ public void testFailureAddKeyWithoutValue() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream dos = writer.prepareAppendKey(-1);
+ dos.write("key0".getBytes());
+ try {
+ closeOutput();
+ fail("Cannot add only a key without a value. ");
+ }
+ catch (IllegalStateException e) {
+ // noop, expecting an exception
+ }
+ }
+
+ public void testFailureAddValueWithoutKey() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outValue = null;
+ try {
+ outValue = writer.prepareAppendValue(6);
+ outValue.write("value0".getBytes());
+ fail("Cannot add a value without adding key first. ");
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ finally {
+ if (outValue != null) {
+ outValue.close();
+ }
+ }
+ }
+
+ public void testFailureOneEntryKnownLength() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(2);
+ try {
+ outKey.write("key0".getBytes());
+ fail("Specified key length mismatched the actual key length.");
+ }
+ catch (IOException e) {
+ // noop, expecting an exception
+ }
+
+ DataOutputStream outValue = null;
+ try {
+ outValue = writer.prepareAppendValue(6);
+ outValue.write("value0".getBytes());
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ }
+
+ public void testFailureKeyTooLong() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(2);
+ try {
+ outKey.write("key0".getBytes());
+ outKey.close();
+ Assert.fail("Key is longer than requested.");
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ finally {
+ }
+ }
+
+ public void testFailureKeyTooShort() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(4);
+ outKey.write("key0".getBytes());
+ outKey.close();
+ DataOutputStream outValue = writer.prepareAppendValue(15);
+ try {
+ outValue.write("value0".getBytes());
+ outValue.close();
+ Assert.fail("Value is shorter than expected.");
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ finally {
+ }
+ }
+
+ public void testFailureValueTooLong() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(4);
+ outKey.write("key0".getBytes());
+ outKey.close();
+ DataOutputStream outValue = writer.prepareAppendValue(3);
+ try {
+ outValue.write("value0".getBytes());
+ outValue.close();
+ Assert.fail("Value is longer than expected.");
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+
+ try {
+ outKey.close();
+ outKey.close();
+ }
+ catch (Exception e) {
+ Assert.fail("Second or more close() should have no effect.");
+ }
+ }
+
+ public void testFailureValueTooShort() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(8);
+ try {
+ outKey.write("key0".getBytes());
+ outKey.close();
+ Assert.fail("Key is shorter than expected.");
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ finally {
+ }
+ }
+
+ public void testFailureCloseKeyStreamManyTimesInWriter() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(4);
+ try {
+ outKey.write("key0".getBytes());
+ outKey.close();
+ }
+ catch (Exception e) {
+ // noop, expecting an exception
+ }
+ finally {
+ try {
+ outKey.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ outKey.close();
+ outKey.close();
+ Assert.assertTrue("Multiple close should have no effect.", true);
+ }
+
+ public void testFailureKeyLongerThan64K() throws IOException {
+ if (skip)
+ return;
+ try {
+ DataOutputStream outKey = writer.prepareAppendKey(64 * K + 1);
+ Assert.fail("Failed to handle key longer than 64K.");
+ }
+ catch (IndexOutOfBoundsException e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureKeyLongerThan64K_2() throws IOException {
+ if (skip)
+ return;
+ DataOutputStream outKey = writer.prepareAppendKey(-1);
+ try {
+ byte[] buf = new byte[K];
+ Random rand = new Random();
+ for (int nx = 0; nx < K + 2; nx++) {
+ rand.nextBytes(buf);
+ outKey.write(buf);
+ }
+ outKey.close();
+ Assert.fail("Failed to handle key longer than 64K.");
+ }
+ catch (EOFException e) {
+ // noop, expecting exceptions
+ }
+ finally {
+ try {
+ closeOutput();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ }
+
+ public void testFailureNegativeOffset() throws IOException {
+ if (skip)
+ return;
+ writeRecords(2, true, true);
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ byte[] buf = new byte[K];
+ try {
+ scanner.entry().getKey(buf, -1);
+ Assert.fail("Failed to handle key negative offset.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ finally {
+ }
+ scanner.close();
+ reader.close();
+ }
+
+ /**
+ * Verify that the compressed data size is less than raw data size.
+ *
+ * @throws IOException
+ */
+ public void testFailureCompressionNotWorking() throws IOException {
+ if (skip)
+ return;
+ long rawDataSize = writeRecords(10000, false, false, false);
+ if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
+ Assert.assertTrue(out.getPos() < rawDataSize);
+ }
+ closeOutput();
+ }
+
+ public void testFailureCompressionNotWorking2() throws IOException {
+ if (skip)
+ return;
+ long rawDataSize = writeRecords(10000, true, true, false);
+ if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
+ Assert.assertTrue(out.getPos() < rawDataSize);
+ }
+ closeOutput();
+ }
+
+ private long writeRecords(int count, boolean knownKeyLength,
+ boolean knownValueLength, boolean close) throws IOException {
+ long rawDataSize = 0;
+ for (int nx = 0; nx < count; nx++) {
+ String key = TestTFileByteArrays.composeSortedKey("key", count, nx);
+ DataOutputStream outKey =
+ writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
+ outKey.write(key.getBytes());
+ outKey.close();
+ String value = "value" + nx;
+ DataOutputStream outValue =
+ writer.prepareAppendValue(knownValueLength ? value.length() : -1);
+ outValue.write(value.getBytes());
+ outValue.close();
+ rawDataSize +=
+ WritableUtils.getVIntSize(key.getBytes().length)
+ + key.getBytes().length
+ + WritableUtils.getVIntSize(value.getBytes().length)
+ + value.getBytes().length;
+ }
+ if (close) {
+ closeOutput();
+ }
+ return rawDataSize;
+ }
+
+ private long writeRecords(int count, boolean knownKeyLength,
+ boolean knownValueLength) throws IOException {
+ return writeRecords(count, knownKeyLength, knownValueLength, true);
+ }
+
+ private void closeOutput() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileUnsortedByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileUnsortedByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileUnsortedByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileUnsortedByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+public class TestTFileUnsortedByteArrays extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+
+
+ private final static int BLOCK_SIZE = 512;
+ private final static int BUF_SIZE = 64;
+
+ private FileSystem fs;
+ private Configuration conf;
+ private Path path;
+ private FSDataOutputStream out;
+ private Writer writer;
+
+ private String compression = Compression.Algorithm.GZ.getName();
+ private String outputFile = "TFileTestUnsorted";
+ /*
+ * pre-sampled numbers of records in one block, based on the given the
+ * generated key and value strings
+ */
+ private int records1stBlock = 4314;
+ private int records2ndBlock = 4108;
+
+ public void init(String compression, String outputFile,
+ int numRecords1stBlock, int numRecords2ndBlock) {
+ this.compression = compression;
+ this.outputFile = outputFile;
+ this.records1stBlock = numRecords1stBlock;
+ this.records2ndBlock = numRecords2ndBlock;
+ }
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile);
+ fs = path.getFileSystem(conf);
+ out = fs.create(path);
+ writer = new Writer(out, BLOCK_SIZE, compression, null, conf);
+ writer.append("keyZ".getBytes(), "valueZ".getBytes());
+ writer.append("keyM".getBytes(), "valueM".getBytes());
+ writer.append("keyN".getBytes(), "valueN".getBytes());
+ writer.append("keyA".getBytes(), "valueA".getBytes());
+ closeOutput();
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ fs.delete(path, true);
+ }
+
+ // we still can scan records in an unsorted TFile
+ public void testFailureScannerWithKeys() throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.assertFalse(reader.isSorted());
+ Assert.assertEquals((int) reader.getEntryCount(), 4);
+
+ try {
+ Scanner scanner =
+ reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
+ Assert
+ .fail("Failed to catch creating scanner with keys on unsorted file.");
+ }
+ catch (RuntimeException e) {
+ }
+ finally {
+ reader.close();
+ }
+ }
+
+ // we still can scan records in an unsorted TFile
+ public void testScan() throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.assertFalse(reader.isSorted());
+ Assert.assertEquals((int) reader.getEntryCount(), 4);
+
+ Scanner scanner = reader.createScanner();
+
+ try {
+
+ // read key and value
+ byte[] kbuf = new byte[BUF_SIZE];
+ int klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
+
+ byte[] vbuf = new byte[BUF_SIZE];
+ int vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
+
+ scanner.advance();
+
+ // now try get value first
+ vbuf = new byte[BUF_SIZE];
+ vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
+
+ kbuf = new byte[BUF_SIZE];
+ klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ // we still can scan records in an unsorted TFile
+ public void testScanRange() throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.assertFalse(reader.isSorted());
+ Assert.assertEquals((int) reader.getEntryCount(), 4);
+
+ Scanner scanner = reader.createScanner();
+
+ try {
+
+ // read key and value
+ byte[] kbuf = new byte[BUF_SIZE];
+ int klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
+
+ byte[] vbuf = new byte[BUF_SIZE];
+ int vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
+
+ scanner.advance();
+
+ // now try get value first
+ vbuf = new byte[BUF_SIZE];
+ vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
+
+ kbuf = new byte[BUF_SIZE];
+ klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ public void testFailureSeek() throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+
+ try {
+ // can't find ceil
+ try {
+ scanner.lowerBound("keyN".getBytes());
+ Assert.fail("Cannot search in a unsorted TFile!");
+ }
+ catch (Exception e) {
+ // noop, expecting excetions
+ }
+ finally {
+ }
+
+ // can't find higher
+ try {
+ scanner.upperBound("keyA".getBytes());
+ Assert.fail("Cannot search higher in a unsorted TFile!");
+ }
+ catch (Exception e) {
+ // noop, expecting excetions
+ }
+ finally {
+ }
+
+ // can't seek
+ try {
+ scanner.seekTo("keyM".getBytes());
+ Assert.fail("Cannot search a unsorted TFile!");
+ }
+ catch (Exception e) {
+ // noop, expecting excetions
+ }
+ finally {
+ }
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ private void closeOutput() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ out.close();
+ out = null;
+ }
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestVLong.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestVLong.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestVLong.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestVLong.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestVLong extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+ private Configuration conf;
+ private FileSystem fs;
+ private Path path;
+ private String outputFile = "TestVLong";
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile);
+ fs = path.getFileSystem(conf);
+ if (fs.exists(path)) {
+ fs.delete(path, false);
+ }
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ if (fs.exists(path)) {
+ fs.delete(path, false);
+ }
+ }
+
+ public void testVLongByte() throws IOException {
+ FSDataOutputStream out = fs.create(path);
+ for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) {
+ Utils.writeVLong(out, i);
+ }
+ out.close();
+ Assert.assertEquals("Incorrect encoded size", (1 << Byte.SIZE) + 96, fs
+ .getFileStatus(
+ path).getLen());
+
+ FSDataInputStream in = fs.open(path);
+ for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) {
+ long n = Utils.readVLong(in);
+ Assert.assertEquals(n, i);
+ }
+ in.close();
+ fs.delete(path, false);
+ }
+
+ private long writeAndVerify(int shift) throws IOException {
+ FSDataOutputStream out = fs.create(path);
+ for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
+ Utils.writeVLong(out, ((long) i) << shift);
+ }
+ out.close();
+ FSDataInputStream in = fs.open(path);
+ for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
+ long n = Utils.readVLong(in);
+ Assert.assertEquals(n, ((long) i) << shift);
+ }
+ in.close();
+ long ret = fs.getFileStatus(path).getLen();
+ fs.delete(path, false);
+ return ret;
+ }
+
+ public void testVLongShort() throws IOException {
+ long size = writeAndVerify(0);
+ Assert.assertEquals("Incorrect encoded size", (1 << Short.SIZE) * 2
+ + ((1 << Byte.SIZE) - 40)
+ * (1 << Byte.SIZE) - 128 - 32, size);
+ }
+
+ public void testVLong3Bytes() throws IOException {
+ long size = writeAndVerify(Byte.SIZE);
+ Assert.assertEquals("Incorrect encoded size", (1 << Short.SIZE) * 3
+ + ((1 << Byte.SIZE) - 32) * (1 << Byte.SIZE) - 40 - 1, size);
+ }
+
+ public void testVLong4Bytes() throws IOException {
+ long size = writeAndVerify(Byte.SIZE * 2);
+ Assert.assertEquals("Incorrect encoded size", (1 << Short.SIZE) * 4
+ + ((1 << Byte.SIZE) - 16) * (1 << Byte.SIZE) - 32 - 2, size);
+ }
+
+ public void testVLong5Bytes() throws IOException {
+ long size = writeAndVerify(Byte.SIZE * 3);
+ Assert.assertEquals("Incorrect encoded size", (1 << Short.SIZE) * 6 - 256
+ - 16 - 3, size);
+ }
+
+ private void verifySixOrMoreBytes(int bytes) throws IOException {
+ long size = writeAndVerify(Byte.SIZE * (bytes - 2));
+ Assert.assertEquals("Incorrect encoded size", (1 << Short.SIZE)
+ * (bytes + 1) - 256 - bytes + 1, size);
+ }
+ public void testVLong6Bytes() throws IOException {
+ verifySixOrMoreBytes(6);
+ }
+
+ public void testVLong7Bytes() throws IOException {
+ verifySixOrMoreBytes(7);
+ }
+
+ public void testVLong8Bytes() throws IOException {
+ verifySixOrMoreBytes(8);
+ }
+
+ public void testVLongRandom() throws IOException {
+ int count = 1024 * 1024;
+ long data[] = new long[count];
+ Random rng = new Random();
+ for (int i = 0; i < data.length; ++i) {
+ int shift = rng.nextInt(Long.SIZE) + 1;
+ long mask = (1L << shift) - 1;
+ long a = ((long) rng.nextInt()) << 32;
+ long b = ((long) rng.nextInt()) & 0xffffffff;
+ data[i] = (a + b) & mask;
+ }
+
+ FSDataOutputStream out = fs.create(path);
+ for (int i = 0; i < data.length; ++i) {
+ Utils.writeVLong(out, data[i]);
+ }
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ for (int i = 0; i < data.length; ++i) {
+ Assert.assertEquals(Utils.readVLong(in), data[i]);
+ }
+ in.close();
+ fs.delete(path, false);
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/Timer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/Timer.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/Timer.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/Timer.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+/**
+ * this class is a time class to
+ * measure to measure the time
+ * taken for some event.
+ */
+public class Timer {
+ long startTimeEpoch;
+ long finishTimeEpoch;
+ private DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public void startTime() throws IOException {
+ startTimeEpoch = System.currentTimeMillis();
+ }
+
+ public void stopTime() throws IOException {
+ finishTimeEpoch = System.currentTimeMillis();
+ }
+
+ public long getIntervalMillis() throws IOException {
+ return finishTimeEpoch - startTimeEpoch;
+ }
+
+ public void printlnWithTimestamp(String message) throws IOException {
+ System.out.println(formatCurrentTime() + " " + message);
+ }
+
+ public String formatTime(long millis) {
+ return formatter.format(millis);
+ }
+
+ public String getIntervalString() throws IOException {
+ long time = getIntervalMillis();
+ return formatTime(time);
+ }
+
+ public String formatCurrentTime() {
+ return formatTime(System.currentTimeMillis());
+ }
+
+}
+