You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/21 16:36:14 UTC
svn commit: r882929 [5/7] - in
/hadoop/pig/branches/branch-0.6/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/schema/
src/java/org/apache/hadoop/zebra/tfile/ src/java/org...
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFile.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFile.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFile.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+/**
+ * test tfile features.
+ *
+ */
+public class TestTFile extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+ private FileSystem fs;
+ private Configuration conf;
+ private static final int minBlockSize = 512;
+ private static final int largeVal = 3 * 1024 * 1024;
+ private static final String localFormatter = "%010d";
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ fs = FileSystem.get(conf);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ // do nothing
+ }
+
+ // read a key from the scanner
+ public byte[] readKey(Scanner scanner) throws IOException {
+ int keylen = scanner.entry().getKeyLength();
+ byte[] read = new byte[keylen];
+ scanner.entry().getKey(read);
+ return read;
+ }
+
+ // read a value from the scanner
+ public byte[] readValue(Scanner scanner) throws IOException {
+ int valueLen = scanner.entry().getValueLength();
+ byte[] read = new byte[valueLen];
+ scanner.entry().getValue(read);
+ return read;
+ }
+
+ // read a long value from the scanner
+ public byte[] readLongValue(Scanner scanner, int len) throws IOException {
+ DataInputStream din = scanner.entry().getValueStream();
+ byte[] b = new byte[len];
+ din.readFully(b);
+ din.close();
+ return b;
+ }
+
+ // write some records into the tfile
+ // write them twice
+ private int writeSomeRecords(Writer writer, int start, int n)
+ throws IOException {
+ String value = "value";
+ for (int i = start; i < (start + n); i++) {
+ String key = String.format(localFormatter, i);
+ writer.append(key.getBytes(), (value + key).getBytes());
+ writer.append(key.getBytes(), (value + key).getBytes());
+ }
+ return (start + n);
+ }
+
+ // read the records and check
+ private int readAndCheckbytes(Scanner scanner, int start, int n)
+ throws IOException {
+ String value = "value";
+ for (int i = start; i < (start + n); i++) {
+ byte[] key = readKey(scanner);
+ byte[] val = readValue(scanner);
+ String keyStr = String.format(localFormatter, i);
+ String valStr = value + keyStr;
+ assertTrue("btyes for keys do not match " + keyStr + " "
+ + new String(key), Arrays.equals(keyStr.getBytes(), key));
+ assertTrue("bytes for vals do not match " + valStr + " "
+ + new String(val), Arrays.equals(
+ valStr.getBytes(), val));
+ assertTrue(scanner.advance());
+ key = readKey(scanner);
+ val = readValue(scanner);
+ assertTrue("btyes for keys do not match", Arrays.equals(
+ keyStr.getBytes(), key));
+ assertTrue("bytes for vals do not match", Arrays.equals(
+ valStr.getBytes(), val));
+ assertTrue(scanner.advance());
+ }
+ return (start + n);
+ }
+
+ // write some large records
+ // write them twice
+ private int writeLargeRecords(Writer writer, int start, int n)
+ throws IOException {
+ byte[] value = new byte[largeVal];
+ for (int i = start; i < (start + n); i++) {
+ String key = String.format(localFormatter, i);
+ writer.append(key.getBytes(), value);
+ writer.append(key.getBytes(), value);
+ }
+ return (start + n);
+ }
+
+ // read large records
+ // read them twice since its duplicated
+ private int readLargeRecords(Scanner scanner, int start, int n)
+ throws IOException {
+ for (int i = start; i < (start + n); i++) {
+ byte[] key = readKey(scanner);
+ String keyStr = String.format(localFormatter, i);
+ assertTrue("btyes for keys do not match", Arrays.equals(
+ keyStr.getBytes(), key));
+ scanner.advance();
+ key = readKey(scanner);
+ assertTrue("btyes for keys do not match", Arrays.equals(
+ keyStr.getBytes(), key));
+ scanner.advance();
+ }
+ return (start + n);
+ }
+
+ // write empty keys and values
+ private void writeEmptyRecords(Writer writer, int n) throws IOException {
+ byte[] key = new byte[0];
+ byte[] value = new byte[0];
+ for (int i = 0; i < n; i++) {
+ writer.append(key, value);
+ }
+ }
+
+ // read empty keys and values
+ private void readEmptyRecords(Scanner scanner, int n) throws IOException {
+ byte[] key = new byte[0];
+ byte[] value = new byte[0];
+ byte[] readKey = null;
+ byte[] readValue = null;
+ for (int i = 0; i < n; i++) {
+ readKey = readKey(scanner);
+ readValue = readValue(scanner);
+ assertTrue("failed to match keys", Arrays.equals(readKey, key));
+ assertTrue("failed to match values", Arrays.equals(readValue, value));
+ assertTrue("failed to advance cursor", scanner.advance());
+ }
+ }
+
+ private int writePrepWithKnownLength(Writer writer, int start, int n)
+ throws IOException {
+ // get the length of the key
+ String key = String.format(localFormatter, start);
+ int keyLen = key.getBytes().length;
+ String value = "value" + key;
+ int valueLen = value.getBytes().length;
+ for (int i = start; i < (start + n); i++) {
+ DataOutputStream out = writer.prepareAppendKey(keyLen);
+ String localKey = String.format(localFormatter, i);
+ out.write(localKey.getBytes());
+ out.close();
+ out = writer.prepareAppendValue(valueLen);
+ String localValue = "value" + localKey;
+ out.write(localValue.getBytes());
+ out.close();
+ }
+ return (start + n);
+ }
+
+ private int readPrepWithKnownLength(Scanner scanner, int start, int n)
+ throws IOException {
+ for (int i = start; i < (start + n); i++) {
+ String key = String.format(localFormatter, i);
+ byte[] read = readKey(scanner);
+ assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+ String value = "value" + key;
+ read = readValue(scanner);
+ assertTrue("values not equal", Arrays.equals(value.getBytes(), read));
+ scanner.advance();
+ }
+ return (start + n);
+ }
+
+ private int writePrepWithUnkownLength(Writer writer, int start, int n)
+ throws IOException {
+ for (int i = start; i < (start + n); i++) {
+ DataOutputStream out = writer.prepareAppendKey(-1);
+ String localKey = String.format(localFormatter, i);
+ out.write(localKey.getBytes());
+ out.close();
+ String value = "value" + localKey;
+ out = writer.prepareAppendValue(-1);
+ out.write(value.getBytes());
+ out.close();
+ }
+ return (start + n);
+ }
+
+ private int readPrepWithUnknownLength(Scanner scanner, int start, int n)
+ throws IOException {
+ for (int i = start; i < start; i++) {
+ String key = String.format(localFormatter, i);
+ byte[] read = readKey(scanner);
+ assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+ try {
+ read = readValue(scanner);
+ assertTrue(false);
+ }
+ catch (IOException ie) {
+ // should have thrown exception
+ }
+ String value = "value" + key;
+ read = readLongValue(scanner, value.getBytes().length);
+ assertTrue("values nto equal", Arrays.equals(read, value.getBytes()));
+ scanner.advance();
+ }
+ return (start + n);
+ }
+
+ private byte[] getSomeKey(int rowId) {
+ return String.format(localFormatter, rowId).getBytes();
+ }
+
+ private void writeRecords(Writer writer) throws IOException {
+ writeEmptyRecords(writer, 10);
+ int ret = writeSomeRecords(writer, 0, 100);
+ ret = writeLargeRecords(writer, ret, 1);
+ ret = writePrepWithKnownLength(writer, ret, 40);
+ ret = writePrepWithUnkownLength(writer, ret, 50);
+ writer.close();
+ }
+
+ private void readAllRecords(Scanner scanner) throws IOException {
+ readEmptyRecords(scanner, 10);
+ int ret = readAndCheckbytes(scanner, 0, 100);
+ ret = readLargeRecords(scanner, ret, 1);
+ ret = readPrepWithKnownLength(scanner, ret, 40);
+ ret = readPrepWithUnknownLength(scanner, ret, 50);
+ }
+
+ private FSDataOutputStream createFSOutput(Path name) throws IOException {
+ if (fs.exists(name)) fs.delete(name, true);
+ FSDataOutputStream fout = fs.create(name);
+ return fout;
+ }
+
+ /**
+ * test none codecs
+ */
+ void basicWithSomeCodec(String codec) throws IOException {
+ Path ncTFile = new Path(ROOT, "basic.tfile");
+ FSDataOutputStream fout = createFSOutput(ncTFile);
+ Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf);
+ writeRecords(writer);
+ fout.close();
+ FSDataInputStream fin = fs.open(ncTFile);
+ Reader reader =
+ new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf);
+
+ Scanner scanner = reader.createScanner();
+ readAllRecords(scanner);
+ scanner.seekTo(getSomeKey(50));
+ assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50)));
+ // read the key and see if it matches
+ byte[] readKey = readKey(scanner);
+ assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50),
+ readKey));
+
+ scanner.seekTo(new byte[0]);
+ byte[] val1 = readValue(scanner);
+ scanner.seekTo(new byte[0]);
+ byte[] val2 = readValue(scanner);
+ assertTrue(Arrays.equals(val1, val2));
+
+ // check for lowerBound
+ scanner.lowerBound(getSomeKey(50));
+ assertTrue("locaton lookup failed", scanner.currentLocation
+ .compareTo(reader.end()) < 0);
+ readKey = readKey(scanner);
+ assertTrue("seeked key does not match", Arrays.equals(readKey,
+ getSomeKey(50)));
+
+ // check for upper bound
+ scanner.upperBound(getSomeKey(50));
+ assertTrue("location lookup failed", scanner.currentLocation
+ .compareTo(reader.end()) < 0);
+ readKey = readKey(scanner);
+ assertTrue("seeked key does not match", Arrays.equals(readKey,
+ getSomeKey(51)));
+
+ scanner.close();
+ // test for a range of scanner
+ scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
+ readAndCheckbytes(scanner, 10, 50);
+ assertFalse(scanner.advance());
+ scanner.close();
+ reader.close();
+ fin.close();
+ fs.delete(ncTFile, true);
+ }
+
+ // unsorted with some codec
+ void unsortedWithSomeCodec(String codec) throws IOException {
+ Path uTfile = new Path(ROOT, "unsorted.tfile");
+ FSDataOutputStream fout = createFSOutput(uTfile);
+ Writer writer = new Writer(fout, minBlockSize, codec, null, conf);
+ writeRecords(writer);
+ writer.close();
+ fout.close();
+ FSDataInputStream fin = fs.open(uTfile);
+ Reader reader =
+ new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf);
+
+ Scanner scanner = reader.createScanner();
+ readAllRecords(scanner);
+ scanner.close();
+ reader.close();
+ fin.close();
+ fs.delete(uTfile, true);
+ }
+
+ public void testTFileFeatures() throws IOException {
+ basicWithSomeCodec("none");
+ basicWithSomeCodec("gz");
+ }
+
+ // test unsorted t files.
+ public void testUnsortedTFileFeatures() throws IOException {
+ unsortedWithSomeCodec("none");
+ unsortedWithSomeCodec("gz");
+ }
+
+ private void writeNumMetablocks(Writer writer, String compression, int n)
+ throws IOException {
+ for (int i = 0; i < n; i++) {
+ DataOutputStream dout =
+ writer.prepareMetaBlock("TfileMeta" + i, compression);
+ byte[] b = ("something to test" + i).getBytes();
+ dout.write(b);
+ dout.close();
+ }
+ }
+
+ private void someTestingWithMetaBlock(Writer writer, String compression)
+ throws IOException {
+ DataOutputStream dout = null;
+ writeNumMetablocks(writer, compression, 10);
+ try {
+ dout = writer.prepareMetaBlock("TfileMeta1", compression);
+ assertTrue(false);
+ }
+ catch (MetaBlockAlreadyExists me) {
+ // avoid this exception
+ }
+ dout = writer.prepareMetaBlock("TFileMeta100", compression);
+ dout.close();
+ }
+
+ private void readNumMetablocks(Reader reader, int n) throws IOException {
+ int len = ("something to test" + 0).getBytes().length;
+ for (int i = 0; i < n; i++) {
+ DataInputStream din = reader.getMetaBlock("TfileMeta" + i);
+ byte b[] = new byte[len];
+ din.readFully(b);
+ assertTrue("faield to match metadata", Arrays.equals(
+ ("something to test" + i).getBytes(), b));
+ din.close();
+ }
+ }
+
+ private void someReadingWithMetaBlock(Reader reader) throws IOException {
+ DataInputStream din = null;
+ readNumMetablocks(reader, 10);
+ try {
+ din = reader.getMetaBlock("NO ONE");
+ assertTrue(false);
+ }
+ catch (MetaBlockDoesNotExist me) {
+ // should catch
+ }
+ din = reader.getMetaBlock("TFileMeta100");
+ int read = din.read();
+ assertTrue("check for status", (read == -1));
+ din.close();
+ }
+
+ // test meta blocks for tfiles
+ public void testMetaBlocks() throws IOException {
+ Path mFile = new Path(ROOT, "meta.tfile");
+ FSDataOutputStream fout = createFSOutput(mFile);
+ Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
+ someTestingWithMetaBlock(writer, "none");
+ writer.close();
+ fout.close();
+ FSDataInputStream fin = fs.open(mFile);
+ Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
+ someReadingWithMetaBlock(reader);
+ fs.delete(mFile, true);
+ reader.close();
+ fin.close();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,790 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Location;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+/**
+ *
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ *
+ */
+public class TestTFileByteArrays extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+ private final static int BLOCK_SIZE = 512;
+ private final static int BUF_SIZE = 64;
+ private final static int K = 1024;
+ protected boolean skip = false;
+
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+
+ private FileSystem fs;
+ private Configuration conf;
+ private Path path;
+ private FSDataOutputStream out;
+ private Writer writer;
+
+ private String compression = Compression.Algorithm.GZ.getName();
+ private String comparator = "memcmp";
+ private String outputFile = "TFileTestByteArrays";
+ /*
+ * pre-sampled numbers of records in one block, based on the given the
+ * generated key and value strings
+ */
+ // private int records1stBlock = 4314;
+ // private int records2ndBlock = 4108;
+ private int records1stBlock = 4480;
+ private int records2ndBlock = 4263;
+
+ public void init(String compression, String comparator, String outputFile,
+ int numRecords1stBlock, int numRecords2ndBlock) {
+ this.compression = compression;
+ this.comparator = comparator;
+ this.outputFile = outputFile;
+ this.records1stBlock = numRecords1stBlock;
+ this.records2ndBlock = numRecords2ndBlock;
+ }
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile);
+ fs = path.getFileSystem(conf);
+ out = fs.create(path);
+ writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ if (!skip)
+ fs.delete(path, true);
+ }
+
+ public void testNoDataEntry() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.assertTrue(reader.isSorted());
+ Scanner scanner = reader.createScanner();
+ Assert.assertTrue(scanner.atEnd());
+ scanner.close();
+ reader.close();
+ }
+
+ public void testOneDataEntry() throws IOException {
+ if (skip)
+ return;
+ writeRecords(1);
+ readRecords(1);
+
+ checkBlockIndex(1, 0, 0);
+ readValueBeforeKey(1, 0);
+ readKeyWithoutValue(1, 0);
+ readValueWithoutKey(1, 0);
+ readKeyManyTimes(1, 0);
+ }
+
+ public void testTwoDataEntries() throws IOException {
+ if (skip)
+ return;
+ writeRecords(2);
+ readRecords(2);
+ }
+
+ /**
+ * Fill up exactly one block.
+ *
+ * @throws IOException
+ */
+ public void testOneBlock() throws IOException {
+ if (skip)
+ return;
+ // just under one block
+ writeRecords(records1stBlock);
+ readRecords(records1stBlock);
+ // last key should be in the first block (block 0)
+ checkBlockIndex(records1stBlock, records1stBlock - 1, 0);
+ }
+
+ /**
+ * One block plus one record.
+ *
+ * @throws IOException
+ */
+ public void testOneBlockPlusOneEntry() throws IOException {
+ if (skip)
+ return;
+ writeRecords(records1stBlock + 1);
+ readRecords(records1stBlock + 1);
+ checkBlockIndex(records1stBlock + 1, records1stBlock - 1, 0);
+ checkBlockIndex(records1stBlock + 1, records1stBlock, 1);
+ }
+
+ public void testTwoBlocks() throws IOException {
+ if (skip)
+ return;
+ writeRecords(records1stBlock + 5);
+ readRecords(records1stBlock + 5);
+ checkBlockIndex(records1stBlock + 5, records1stBlock + 4, 1);
+ }
+
+ public void testThreeBlocks() throws IOException {
+ if (skip)
+ return;
+ writeRecords(2 * records1stBlock + 5);
+ readRecords(2 * records1stBlock + 5);
+
+ checkBlockIndex(2 * records1stBlock + 5, 2 * records1stBlock + 4, 2);
+ // 1st key in file
+ readValueBeforeKey(2 * records1stBlock + 5, 0);
+ readKeyWithoutValue(2 * records1stBlock + 5, 0);
+ readValueWithoutKey(2 * records1stBlock + 5, 0);
+ readKeyManyTimes(2 * records1stBlock + 5, 0);
+ // last key in file
+ readValueBeforeKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+ readKeyWithoutValue(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+ readValueWithoutKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+ readKeyManyTimes(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+
+ // 1st key in mid block, verify block indexes then read
+ checkBlockIndex(2 * records1stBlock + 5, records1stBlock - 1, 0);
+ checkBlockIndex(2 * records1stBlock + 5, records1stBlock, 1);
+ readValueBeforeKey(2 * records1stBlock + 5, records1stBlock);
+ readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock);
+ readValueWithoutKey(2 * records1stBlock + 5, records1stBlock);
+ readKeyManyTimes(2 * records1stBlock + 5, records1stBlock);
+
+ // last key in mid block, verify block indexes then read
+ checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock
+ - 1, 1);
+ checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock,
+ 2);
+ readValueBeforeKey(2 * records1stBlock + 5, records1stBlock
+ + records2ndBlock - 1);
+ readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock
+ + records2ndBlock - 1);
+ readValueWithoutKey(2 * records1stBlock + 5, records1stBlock
+ + records2ndBlock - 1);
+ readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + records2ndBlock
+ - 1);
+
+ // mid in mid block
+ readValueBeforeKey(2 * records1stBlock + 5, records1stBlock + 10);
+ readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock + 10);
+ readValueWithoutKey(2 * records1stBlock + 5, records1stBlock + 10);
+ readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + 10);
+ }
+
+ Location locate(Scanner scanner, byte[] key) throws IOException {
+ if (scanner.seekTo(key) == true) {
+ return scanner.currentLocation;
+ }
+ return scanner.endLocation;
+ }
+
+ public void testLocate() throws IOException {
+ if (skip)
+ return;
+ writeRecords(3 * records1stBlock);
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ Location loc2 =
+ locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, 2)
+ .getBytes());
+ Location locLastIn1stBlock =
+ locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
+ records1stBlock - 1).getBytes());
+ Location locFirstIn2ndBlock =
+ locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
+ records1stBlock).getBytes());
+ Location locX = locate(scanner, "keyX".getBytes());
+ Assert.assertEquals(scanner.endLocation, locX);
+ scanner.close();
+ reader.close();
+ }
+
+ public void testFailureWriterNotClosed() throws IOException {
+ if (skip)
+ return;
+ Reader reader = null;
+ try {
+ reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.fail("Cannot read before closing the writer.");
+ }
+ catch (IOException e) {
+ // noop, expecting exceptions
+ }
+ finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ public void testFailureWriteMetaBlocksWithSameName() throws IOException {
+ if (skip)
+ return;
+ writer.append("keyX".getBytes(), "valueX".getBytes());
+
+ // create a new metablock
+ DataOutputStream outMeta =
+ writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+ outMeta.write(123);
+ outMeta.write("foo".getBytes());
+ outMeta.close();
+ // add the same metablock
+ try {
+ DataOutputStream outMeta2 =
+ writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+ Assert.fail("Cannot create metablocks with the same name.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureGetNonExistentMetaBlock() throws IOException {
+ if (skip)
+ return;
+ writer.append("keyX".getBytes(), "valueX".getBytes());
+
+ // create a new metablock
+ DataOutputStream outMeta =
+ writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+ outMeta.write(123);
+ outMeta.write("foo".getBytes());
+ outMeta.close();
+ closeOutput();
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ DataInputStream mb = reader.getMetaBlock("testX");
+ Assert.assertNotNull(mb);
+ mb.close();
+ try {
+ DataInputStream mbBad = reader.getMetaBlock("testY");
+ Assert.assertNull(mbBad);
+ Assert.fail("Error on handling non-existent metablocks.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ reader.close();
+ }
+
+ public void testFailureWriteRecordAfterMetaBlock() throws IOException {
+ if (skip)
+ return;
+ // write a key/value first
+ writer.append("keyX".getBytes(), "valueX".getBytes());
+ // create a new metablock
+ DataOutputStream outMeta =
+ writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+ outMeta.write(123);
+ outMeta.write("dummy".getBytes());
+ outMeta.close();
+ // add more key/value
+ try {
+ writer.append("keyY".getBytes(), "valueY".getBytes());
+ Assert.fail("Cannot add key/value after start adding meta blocks.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureReadValueManyTimes() throws IOException {
+ if (skip)
+ return;
+ writeRecords(5);
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+
+ byte[] vbuf = new byte[BUF_SIZE];
+ int vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
+ try {
+ scanner.entry().getValue(vbuf);
+ Assert.fail("Cannot get the value mlutiple times.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+
+ scanner.close();
+ reader.close();
+ }
+
+ public void testFailureBadCompressionCodec() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+ out = fs.create(path);
+ try {
+ writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
+ Assert.fail("Error on handling invalid compression codecs.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ // e.printStackTrace();
+ }
+ }
+
+ public void testFailureOpenEmptyFile() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+ // create an absolutely empty file
+ path = new Path(fs.getWorkingDirectory(), outputFile);
+ out = fs.create(path);
+ out.close();
+ try {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.fail("Error on handling empty files.");
+ }
+ catch (EOFException e) {
+ // noop, expecting exceptions
+ }
+ }
+
+ public void testFailureOpenRandomFile() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+ // create an random file
+ path = new Path(fs.getWorkingDirectory(), outputFile);
+ out = fs.create(path);
+ Random rand = new Random();
+ byte[] buf = new byte[K];
+ // fill with > 1MB data
+ for (int nx = 0; nx < K + 2; nx++) {
+ rand.nextBytes(buf);
+ out.write(buf);
+ }
+ out.close();
+ try {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Assert.fail("Error on handling random files.");
+ }
+ catch (IOException e) {
+ // noop, expecting exceptions
+ }
+ }
+
+ public void testFailureKeyLongerThan64K() throws IOException {
+ if (skip)
+ return;
+ byte[] buf = new byte[64 * K + 1];
+ Random rand = new Random();
+ rand.nextBytes(buf);
+ try {
+ writer.append(buf, "valueX".getBytes());
+ }
+ catch (IndexOutOfBoundsException e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureOutOfOrderKeys() throws IOException {
+ if (skip)
+ return;
+ try {
+ writer.append("keyM".getBytes(), "valueM".getBytes());
+ writer.append("keyA".getBytes(), "valueA".getBytes());
+ Assert.fail("Error on handling out of order keys.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ // e.printStackTrace();
+ }
+
+ closeOutput();
+ }
+
+ public void testFailureNegativeOffset() throws IOException {
+ if (skip)
+ return;
+ try {
+ writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
+ Assert.fail("Error on handling negative offset.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureNegativeOffset_2() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ try {
+ scanner.lowerBound("keyX".getBytes(), -1, 4);
+ Assert.fail("Error on handling negative offset.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ finally {
+ reader.close();
+ scanner.close();
+ }
+ closeOutput();
+ }
+
+ public void testFailureNegativeLength() throws IOException {
+ if (skip)
+ return;
+ try {
+ writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
+ Assert.fail("Error on handling negative length.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ public void testFailureNegativeLength_2() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ try {
+ scanner.lowerBound("keyX".getBytes(), 0, -1);
+ Assert.fail("Error on handling negative length.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ closeOutput();
+ }
+
+ public void testFailureNegativeLength_3() throws IOException {
+ if (skip)
+ return;
+ writeRecords(3);
+
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ try {
+ // test negative array offset
+ try {
+ scanner.seekTo("keyY".getBytes(), -1, 4);
+ Assert.fail("Failed to handle negative offset.");
+ } catch (Exception e) {
+ // noop, expecting exceptions
+ }
+
+ // test negative array length
+ try {
+ scanner.seekTo("keyY".getBytes(), 0, -2);
+ Assert.fail("Failed to handle negative key length.");
+ } catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ } finally {
+ reader.close();
+ scanner.close();
+ }
+ }
+
+ public void testFailureCompressionNotWorking() throws IOException {
+ if (skip)
+ return;
+ long rawDataSize = writeRecords(10 * records1stBlock, false);
+ if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
+ Assert.assertTrue(out.getPos() < rawDataSize);
+ }
+ closeOutput();
+ }
+
+ public void testFailureFileWriteNotAt0Position() throws IOException {
+ if (skip)
+ return;
+ closeOutput();
+ out = fs.create(path);
+ out.write(123);
+
+ try {
+ writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+ Assert.fail("Failed to catch file write not at position 0.");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ }
+ closeOutput();
+ }
+
+ private long writeRecords(int count) throws IOException {
+ return writeRecords(count, true);
+ }
+
+ private long writeRecords(int count, boolean close) throws IOException {
+ long rawDataSize = writeRecords(writer, count);
+ if (close) {
+ closeOutput();
+ }
+ return rawDataSize;
+ }
+
+ static long writeRecords(Writer writer, int count) throws IOException {
+ long rawDataSize = 0;
+ int nx;
+ for (nx = 0; nx < count; nx++) {
+ byte[] key = composeSortedKey(KEY, count, nx).getBytes();
+ byte[] value = (VALUE + nx).getBytes();
+ writer.append(key, value);
+ rawDataSize +=
+ WritableUtils.getVIntSize(key.length) + key.length
+ + WritableUtils.getVIntSize(value.length) + value.length;
+ }
+ return rawDataSize;
+ }
+
+ /**
+ * Insert some leading 0's in front of the value, to make the keys sorted.
+ *
+ * @param prefix
+ * @param total
+ * @param value
+ * @return
+ */
+ static String composeSortedKey(String prefix, int total, int value) {
+ return String.format("%s%010d", prefix, value);
+ }
+
+ /**
+ * Calculate how many digits are in the 10-based integer.
+ *
+ * @param value
+ * @return
+ */
+ private static int numberDigits(int value) {
+ int digits = 0;
+ while ((value = value / 10) > 0) {
+ digits++;
+ }
+ return digits;
+ }
+
+ private void readRecords(int count) throws IOException {
+ readRecords(fs, path, count, conf);
+ }
+
+ static void readRecords(FileSystem fs, Path path, int count,
+ Configuration conf) throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+
+ try {
+ for (int nx = 0; nx < count; nx++, scanner.advance()) {
+ Assert.assertFalse(scanner.atEnd());
+ // Assert.assertTrue(scanner.next());
+
+ byte[] kbuf = new byte[BUF_SIZE];
+ int klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+ count, nx));
+
+ byte[] vbuf = new byte[BUF_SIZE];
+ int vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
+ }
+
+ Assert.assertTrue(scanner.atEnd());
+ Assert.assertFalse(scanner.advance());
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ private void checkBlockIndex(int count, int recordIndex,
+ int blockIndexExpected) throws IOException {
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner = reader.createScanner();
+ scanner.seekTo(composeSortedKey(KEY, count, recordIndex).getBytes());
+ Assert.assertEquals(blockIndexExpected, scanner.currentLocation
+ .getBlockIndex());
+ scanner.close();
+ reader.close();
+ }
+
+ private void readValueBeforeKey(int count, int recordIndex)
+ throws IOException {
+ Reader reader =
+ new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner =
+ reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
+ .getBytes(), null);
+
+ try {
+ byte[] vbuf = new byte[BUF_SIZE];
+ int vlen = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf);
+ Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
+
+ byte[] kbuf = new byte[BUF_SIZE];
+ int klen = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf);
+ Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+ count, recordIndex));
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ private void readKeyWithoutValue(int count, int recordIndex)
+ throws IOException {
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+ Scanner scanner =
+ reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
+ .getBytes(), null);
+
+ try {
+ // read the indexed key
+ byte[] kbuf1 = new byte[BUF_SIZE];
+ int klen1 = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf1);
+ Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+ count, recordIndex));
+
+ if (scanner.advance() && !scanner.atEnd()) {
+ // read the next key following the indexed
+ byte[] kbuf2 = new byte[BUF_SIZE];
+ int klen2 = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf2);
+ Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
+ count, recordIndex + 1));
+ }
+ }
+ finally {
+ scanner.close();
+ reader.close();
+ }
+ }
+
+ private void readValueWithoutKey(int count, int recordIndex)
+ throws IOException {
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+ Scanner scanner =
+ reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
+ .getBytes(), null);
+
+ byte[] vbuf1 = new byte[BUF_SIZE];
+ int vlen1 = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf1);
+ Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
+
+ if (scanner.advance() && !scanner.atEnd()) {
+ byte[] vbuf2 = new byte[BUF_SIZE];
+ int vlen2 = scanner.entry().getValueLength();
+ scanner.entry().getValue(vbuf2);
+ Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
+ + (recordIndex + 1));
+ }
+
+ scanner.close();
+ reader.close();
+ }
+
+ private void readKeyManyTimes(int count, int recordIndex) throws IOException {
+ Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+ Scanner scanner =
+ reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
+ .getBytes(), null);
+
+ // read the indexed key
+ byte[] kbuf1 = new byte[BUF_SIZE];
+ int klen1 = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf1);
+ Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+ count, recordIndex));
+
+ klen1 = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf1);
+ Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+ count, recordIndex));
+
+ klen1 = scanner.entry().getKeyLength();
+ scanner.entry().getKey(kbuf1);
+ Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+ count, recordIndex));
+
+ scanner.close();
+ reader.close();
+ }
+
+ private void closeOutput() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileComparators.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileComparators.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileComparators.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileComparators.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+
+/**
+ *
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ *
+ */
+public class TestTFileComparators extends TestCase {
+ private static String ROOT =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+
+ private final static int BLOCK_SIZE = 512;
+ private FileSystem fs;
+ private Configuration conf;
+ private Path path;
+ private FSDataOutputStream out;
+ private Writer writer;
+
+ private String compression = Compression.Algorithm.GZ.getName();
+ private String outputFile = "TFileTestComparators";
+ /*
+ * pre-sampled numbers of records in one block, based on the given the
+ * generated key and value strings
+ */
+ // private int records1stBlock = 4314;
+ // private int records2ndBlock = 4108;
+ private int records1stBlock = 4480;
+ private int records2ndBlock = 4263;
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ path = new Path(ROOT, outputFile);
+ fs = path.getFileSystem(conf);
+ out = fs.create(path);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ fs.delete(path, true);
+ }
+
+ // bad comparator format
+ public void testFailureBadComparatorNames() throws IOException {
+ try {
+ writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf);
+ Assert.fail("Failed to catch unsupported comparator names");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ e.printStackTrace();
+ }
+ }
+
+ // jclass that doesn't exist
+ public void testFailureBadJClassNames() throws IOException {
+ try {
+ writer =
+ new Writer(out, BLOCK_SIZE, compression,
+ "jclass: some.non.existence.clazz", conf);
+ Assert.fail("Failed to catch unsupported comparator names");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ e.printStackTrace();
+ }
+ }
+
+ // class exists but not a RawComparator
+ public void testFailureBadJClasses() throws IOException {
+ try {
+ writer =
+ new Writer(out, BLOCK_SIZE, compression,
+ "jclass:org.apache.hadoop.zebra.tfile.Chunk", conf);
+ Assert.fail("Failed to catch unsupported comparator names");
+ }
+ catch (Exception e) {
+ // noop, expecting exceptions
+ e.printStackTrace();
+ }
+ }
+
+ private void closeOutput() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileJClassComparatorByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileJClassComparatorByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileJClassComparatorByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileJClassComparatorByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ *
+ */
+
+public class TestTFileJClassComparatorByteArrays extends TestTFileByteArrays {
+ /**
+ * Test non-compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ init(Compression.Algorithm.GZ.getName(),
+ "jclass: org.apache.hadoop.zebra.tfile.MyComparator",
+ "TFileTestJClassComparator", 4480, 4263);
+ super.setUp();
+ }
+}
+
+class MyComparator implements RawComparator<byte[]>, Serializable {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
+ }
+
+}
+
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.zebra.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsByteArrays extends TestTFileByteArrays {
+ /**
+ * Test LZO compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ skip = !(Algorithm.LZO.isSupported());
+ if (skip) {
+ System.out.println("Skipped");
+ }
+
+ // TODO: sample the generated key/value records, and put the numbers below
+ init(Compression.Algorithm.LZO.getName(), "memcmp", "TFileTestCodecsLzo",
+ 2605, 2558);
+ if (!skip)
+ super.setUp();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsStreams.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsStreams.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsStreams.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsStreams.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.zebra.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsStreams extends TestTFileStreams {
+ /**
+ * Test LZO compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ skip = !(Algorithm.LZO.isSupported());
+ if (skip) {
+ System.out.println("Skipped");
+ }
+ init(Compression.Algorithm.LZO.getName(), "memcmp", "TFileTestCodecsLzo");
+ if (!skip)
+ super.setUp();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+public class TestTFileNoneCodecsByteArrays extends TestTFileByteArrays {
+ /**
+ * Test non-compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ init(Compression.Algorithm.NONE.getName(), "memcmp", "TFileTestCodecsNone",
+ 24, 24);
+ super.setUp();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ *
+ */
+
+public class TestTFileNoneCodecsJClassComparatorByteArrays extends TestTFileByteArrays {
+ /**
+ * Test non-compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ init(Compression.Algorithm.NONE.getName(),
+ "jclass: org.apache.hadoop.zebra.tfile.MyComparator",
+ "TestTFileNoneCodecsJClassComparatorByteArrays", 24, 24);
+ super.setUp();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsStreams.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsStreams.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsStreams.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsStreams.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+public class TestTFileNoneCodecsStreams extends TestTFileStreams {
+ /**
+ * Test non-compression codec, using the same test cases as in the ByteArrays.
+ */
+ @Override
+ public void setUp() throws IOException {
+ init(Compression.Algorithm.NONE.getName(), "memcmp", "TFileTestCodecsNone");
+ super.setUp();
+ }
+}
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeek.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeek.java?rev=882929&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeek.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeek.java Sat Nov 21 15:36:12 2009
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.zebra.tfile.TFile.Reader;
+import org.apache.hadoop.zebra.tfile.TFile.Writer;
+import org.apache.hadoop.zebra.tfile.TFile.Reader.Scanner;
+
+/**
+ * test the performance for seek.
+ *
+ */
+public class TestTFileSeek extends TestCase {
+ private MyOptions options;
+ private Configuration conf;
+ private Path path;
+ private FileSystem fs;
+ private NanoTimer timer;
+ private Random rng;
+ private DiscreteRNG keyLenGen;
+ private KVGenerator kvGen;
+
+ @Override
+ public void setUp() throws IOException {
+ if (options == null) {
+ options = new MyOptions(new String[0]);
+ }
+
+ conf = new Configuration();
+ conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
+ conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
+ path = new Path(new Path(options.rootDir), options.file);
+ fs = path.getFileSystem(conf);
+ timer = new NanoTimer(false);
+ rng = new Random(options.seed);
+ keyLenGen =
+ new RandomDistribution.Zipf(new Random(rng.nextLong()),
+ options.minKeyLen, options.maxKeyLen, 1.2);
+ DiscreteRNG valLenGen =
+ new RandomDistribution.Flat(new Random(rng.nextLong()),
+ options.minValLength, options.maxValLength);
+ DiscreteRNG wordLenGen =
+ new RandomDistribution.Flat(new Random(rng.nextLong()),
+ options.minWordLen, options.maxWordLen);
+ kvGen =
+ new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
+ options.dictSize);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ fs.delete(path, true);
+ }
+
+ private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
+ throws IOException {
+ if (fs.exists(name)) {
+ fs.delete(name, true);
+ }
+ FSDataOutputStream fout = fs.create(name);
+ return fout;
+ }
+
+ private void createTFile() throws IOException {
+ long totalBytes = 0;
+ FSDataOutputStream fout = createFSOutput(path, fs);
+ try {
+ Writer writer =
+ new Writer(fout, options.minBlockSize, options.compress, "memcmp",
+ conf);
+ try {
+ BytesWritable key = new BytesWritable();
+ BytesWritable val = new BytesWritable();
+ timer.start();
+ for (long i = 0; true; ++i) {
+ if (i % 1000 == 0) { // test the size for every 1000 rows.
+ if (fs.getFileStatus(path).getLen() >= options.fileSize) {
+ break;
+ }
+ }
+ kvGen.next(key, val, false);
+ writer.append(key.get(), 0, key.getSize(), val.get(), 0, val
+ .getSize());
+ totalBytes += key.getSize();
+ totalBytes += val.getSize();
+ }
+ timer.stop();
+ }
+ finally {
+ writer.close();
+ }
+ }
+ finally {
+ fout.close();
+ }
+ double duration = (double)timer.read()/1000; // in us.
+ long fsize = fs.getFileStatus(path).getLen();
+
+ System.out.printf(
+ "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
+ timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
+ / duration);
+ System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
+ timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
+ }
+
+ public void seekTFile() throws IOException {
+ int miss = 0;
+ long totalBytes = 0;
+ FSDataInputStream fsdis = fs.open(path);
+ Reader reader =
+ new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
+ KeySampler kSampler =
+ new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
+ keyLenGen);
+ Scanner scanner = reader.createScanner();
+ BytesWritable key = new BytesWritable();
+ BytesWritable val = new BytesWritable();
+ timer.reset();
+ timer.start();
+ for (int i = 0; i < options.seekCount; ++i) {
+ kSampler.next(key);
+ scanner.lowerBound(key.get(), 0, key.getSize());
+ if (!scanner.atEnd()) {
+ scanner.entry().get(key, val);
+ totalBytes += key.getSize();
+ totalBytes += val.getSize();
+ }
+ else {
+ ++miss;
+ }
+ }
+ timer.stop();
+ double duration = (double) timer.read() / 1000; // in us.
+ System.out.printf(
+ "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
+ timer.toString(), NanoTimer.nanoTimeToString(timer.read()
+ / options.seekCount), options.seekCount - miss, miss,
+ (double) totalBytes / 1024 / (options.seekCount - miss));
+
+ }
+
+ public void testSeeks() throws IOException {
+ String[] supported = TFile.getSupportedCompressionAlgorithms();
+ boolean proceed = false;
+ for (String c : supported) {
+ if (c.equals(options.compress)) {
+ proceed = true;
+ break;
+ }
+ }
+
+ if (!proceed) {
+ System.out.println("Skipped for " + options.compress);
+ return;
+ }
+
+ if (options.doCreate()) {
+ createTFile();
+ }
+
+ if (options.doRead()) {
+ seekTFile();
+ }
+ }
+
+ private static class IntegerRange {
+ private final int from, to;
+
+ public IntegerRange(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ public static IntegerRange parse(String s) throws ParseException {
+ StringTokenizer st = new StringTokenizer(s, " \t,");
+ if (st.countTokens() != 2) {
+ throw new ParseException("Bad integer specification: " + s);
+ }
+ int from = Integer.parseInt(st.nextToken());
+ int to = Integer.parseInt(st.nextToken());
+ return new IntegerRange(from, to);
+ }
+
+ public int from() {
+ return from;
+ }
+
+ public int to() {
+ return to;
+ }
+ }
+
+ private static class MyOptions {
+ // hard coded constants
+ int dictSize = 1000;
+ int minWordLen = 5;
+ int maxWordLen = 20;
+ int osInputBufferSize = 64 * 1024;
+ int osOutputBufferSize = 64 * 1024;
+ int fsInputBufferSizeNone = 0;
+ int fsInputBufferSizeLzo = 0;
+ int fsInputBufferSizeGz = 0;
+ int fsOutputBufferSizeNone = 1;
+ int fsOutputBufferSizeLzo = 1;
+ int fsOutputBufferSizeGz = 1;
+
+ String rootDir =
+ System.getProperty("test.build.data", "/tmp/tfile-test");
+ String file = "TestTFileSeek";
+ String compress = "gz";
+ int minKeyLen = 10;
+ int maxKeyLen = 50;
+ int minValLength = 100;
+ int maxValLength = 200;
+ int minBlockSize = 64 * 1024;
+ int fsOutputBufferSize = 1;
+ int fsInputBufferSize = 0;
+ long fileSize = 3 * 1024 * 1024;
+ long seekCount = 1000;
+ long seed;
+
+ static final int OP_CREATE = 1;
+ static final int OP_READ = 2;
+ int op = OP_CREATE | OP_READ;
+
+ boolean proceed = false;
+
+ public MyOptions(String[] args) {
+ seed = System.nanoTime();
+
+ try {
+ Options opts = buildOptions();
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = parser.parse(opts, args, true);
+ processOptions(line, opts);
+ validateOptions();
+ }
+ catch (ParseException e) {
+ System.out.println(e.getMessage());
+ System.out.println("Try \"--help\" option for details.");
+ setStopProceed();
+ }
+ }
+
+ public boolean proceed() {
+ return proceed;
+ }
+
+ private Options buildOptions() {
+ Option compress =
+ OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
+ .hasArg().withDescription("compression scheme").create('c');
+
+ Option fileSize =
+ OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
+ .hasArg().withDescription("target size of the file (in MB).")
+ .create('s');
+
+ Option fsInputBufferSz =
+ OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
+ .hasArg().withDescription(
+ "size of the file system input buffer (in bytes).").create(
+ 'i');
+
+ Option fsOutputBufferSize =
+ OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
+ .hasArg().withDescription(
+ "size of the file system output buffer (in bytes).").create(
+ 'o');
+
+ Option keyLen =
+ OptionBuilder
+ .withLongOpt("key-length")
+ .withArgName("min,max")
+ .hasArg()
+ .withDescription(
+ "the length range of the key (in bytes)")
+ .create('k');
+
+ Option valueLen =
+ OptionBuilder
+ .withLongOpt("value-length")
+ .withArgName("min,max")
+ .hasArg()
+ .withDescription(
+ "the length range of the value (in bytes)")
+ .create('v');
+
+ Option blockSz =
+ OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
+ .withDescription("minimum block size (in KB)").create('b');
+
+ Option seed =
+ OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
+ .withDescription("specify the seed").create('S');
+
+ Option operation =
+ OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
+ .withDescription(
+ "action: seek-only, create-only, seek-after-create").create(
+ 'x');
+
+ Option rootDir =
+ OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
+ .withDescription(
+ "specify root directory where files will be created.")
+ .create('r');
+
+ Option file =
+ OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
+ .withDescription("specify the file name to be created or read.")
+ .create('f');
+
+ Option seekCount =
+ OptionBuilder
+ .withLongOpt("seek")
+ .withArgName("count")
+ .hasArg()
+ .withDescription(
+ "specify how many seek operations we perform (requires -x r or -x rw.")
+ .create('n');
+
+ Option help =
+ OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
+ "show this screen").create("h");
+
+ return new Options().addOption(compress).addOption(fileSize).addOption(
+ fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
+ .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption(
+ operation).addOption(seekCount).addOption(file).addOption(help);
+
+ }
+
+ private void processOptions(CommandLine line, Options opts)
+ throws ParseException {
+ // --help -h and --version -V must be processed first.
+ if (line.hasOption('h')) {
+ HelpFormatter formatter = new HelpFormatter();
+ System.out.println("TFile and SeqFile benchmark.");
+ System.out.println();
+ formatter.printHelp(100,
+ "java ... TestTFileSeqFileComparison [options]",
+ "\nSupported options:", opts, "");
+ return;
+ }
+
+ if (line.hasOption('c')) {
+ compress = line.getOptionValue('c');
+ }
+
+ if (line.hasOption('d')) {
+ dictSize = Integer.parseInt(line.getOptionValue('d'));
+ }
+
+ if (line.hasOption('s')) {
+ fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
+ }
+
+ if (line.hasOption('i')) {
+ fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
+ }
+
+ if (line.hasOption('o')) {
+ fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
+ }
+
+ if (line.hasOption('n')) {
+ seekCount = Integer.parseInt(line.getOptionValue('n'));
+ }
+
+ if (line.hasOption('k')) {
+ IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
+ minKeyLen = ir.from();
+ maxKeyLen = ir.to();
+ }
+
+ if (line.hasOption('v')) {
+ IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
+ minValLength = ir.from();
+ maxValLength = ir.to();
+ }
+
+ if (line.hasOption('b')) {
+ minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
+ }
+
+ if (line.hasOption('r')) {
+ rootDir = line.getOptionValue('r');
+ }
+
+ if (line.hasOption('f')) {
+ file = line.getOptionValue('f');
+ }
+
+ if (line.hasOption('S')) {
+ seed = Long.parseLong(line.getOptionValue('S'));
+ }
+
+ if (line.hasOption('x')) {
+ String strOp = line.getOptionValue('x');
+ if (strOp.equals("r")) {
+ op = OP_READ;
+ }
+ else if (strOp.equals("w")) {
+ op = OP_CREATE;
+ }
+ else if (strOp.equals("rw")) {
+ op = OP_CREATE | OP_READ;
+ }
+ else {
+ throw new ParseException("Unknown action specifier: " + strOp);
+ }
+ }
+
+ proceed = true;
+ }
+
+ private void validateOptions() throws ParseException {
+ if (!compress.equals("none") && !compress.equals("lzo")
+ && !compress.equals("gz")) {
+ throw new ParseException("Unknown compression scheme: " + compress);
+ }
+
+ if (minKeyLen >= maxKeyLen) {
+ throw new ParseException(
+ "Max key length must be greater than min key length.");
+ }
+
+ if (minValLength >= maxValLength) {
+ throw new ParseException(
+ "Max value length must be greater than min value length.");
+ }
+
+ if (minWordLen >= maxWordLen) {
+ throw new ParseException(
+ "Max word length must be greater than min word length.");
+ }
+ return;
+ }
+
+ private void setStopProceed() {
+ proceed = false;
+ }
+
+ public boolean doCreate() {
+ return (op & OP_CREATE) != 0;
+ }
+
+ public boolean doRead() {
+ return (op & OP_READ) != 0;
+ }
+ }
+
+ public static void main(String[] argv) throws IOException {
+ TestTFileSeek testCase = new TestTFileSeek();
+ MyOptions options = new MyOptions(argv);
+
+ if (options.proceed == false) {
+ return;
+ }
+
+ testCase.options = options;
+ testCase.setUp();
+ testCase.testSeeks();
+ testCase.tearDown();
+ }
+}