You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:01 UTC

[2/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
new file mode 100644
index 0000000..003d9eb
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+public class DTFileTest
+{
+  private Configuration conf;
+  private Path path;
+  private FileSystem fs;
+  private NanoTimer timer;
+  private Random rng;
+  private RandomDistribution.DiscreteRNG keyLenGen;
+  private KVGenerator kvGen;
+
+
+  static class TestConf {
+    public int minWordLen = 5;
+    public int maxWordLen = 20;
+    public int dictSize = 1000;
+    int minKeyLen = 10;
+    int maxKeyLen = 50;
+    int minValLength = 100;
+    int maxValLength = 200;
+    int minBlockSize = 64 * 1024;
+    int fsOutputBufferSize = 1;
+    int fsInputBufferSize = 256 * 1024;
+    long fileSize = 3 * 1024 * 1024;
+    long seekCount = 1000;
+    String compress = "gz";
+
+  }
+
+  TestConf tconf = new TestConf();
+
+  public void setUp() throws IOException
+  {
+    conf = new Configuration();
+
+    conf.setInt("tfile.fs.input.buffer.size", tconf.fsInputBufferSize);
+    conf.setInt("tfile.fs.output.buffer.size", tconf.fsOutputBufferSize);
+    path = new Path("tmp/dtfile");
+    fs = path.getFileSystem(conf);
+    timer = new NanoTimer(false);
+    rng = new Random();
+    keyLenGen =
+        new RandomDistribution.Zipf(new Random(rng.nextLong()),
+            tconf.minKeyLen, tconf.maxKeyLen, 1.2);
+    RandomDistribution.DiscreteRNG valLenGen =
+        new RandomDistribution.Flat(new Random(rng.nextLong()),
+            tconf.minValLength, tconf.maxValLength);
+    RandomDistribution.DiscreteRNG wordLenGen =
+        new RandomDistribution.Flat(new Random(rng.nextLong()),
+            tconf.minWordLen, tconf.maxWordLen);
+    kvGen =
+        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
+            tconf.dictSize);
+  }
+
+
+  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
+      throws IOException {
+    if (fs.exists(name)) {
+      fs.delete(name, true);
+    }
+    FSDataOutputStream fout = fs.create(name);
+    return fout;
+  }
+
+  int tuples = 0;
+
+  private void writeTFile() throws IOException
+  {
+
+    FSDataOutputStream fout = createFSOutput(path, fs);
+    byte[] key = new byte[16];
+    ByteBuffer bb = ByteBuffer.wrap(key);
+    try {
+      DTFile.Writer writer =
+          new DTFile.Writer(fout, tconf.minBlockSize, tconf.compress, "memcmp",
+              conf);
+      try {
+        BytesWritable tmpKey = new BytesWritable();
+        BytesWritable val = new BytesWritable();
+        for (long i = 0; true; ++i) {
+          if (i % 1000 == 0) { // test the size for every 1000 rows.
+            if (fs.getFileStatus(path).getLen() >= tconf.fileSize) {
+              break;
+            }
+          }
+          bb.clear();
+          bb.putLong(i);
+          kvGen.next(tmpKey, val, false);
+          writer.append(key, 0, key.length, val.get(), 0, val
+              .getSize());
+          tuples++;
+        }
+      }
+      finally {
+        writer.close();
+      }
+    }
+    finally {
+      fout.close();
+    }
+
+    long fsize = fs.getFileStatus(path).getLen();
+
+    System.out.println("Total tuple wrote " + tuples + " File size " + fsize / (1024.0 * 1024));
+  }
+
+
+
+  @Test
+  public void seekDTFile() throws IOException
+  {
+    Random random = new Random();
+    int ikey = random.nextInt(tuples);
+    byte[] key = new byte[16];
+    ByteBuffer bb = ByteBuffer.wrap(key);
+    bb.putLong(ikey);
+
+    FSDataInputStream fsdis = fs.open(path);
+
+    if (CacheManager.getCache() != null) {
+      CacheManager.getCache().invalidateAll();
+    }
+    CacheManager.setEnableStats(true);
+    Assert.assertEquals("Cache Contains no block", CacheManager.getCacheSize(), 0);
+
+    DTFile.Reader reader = new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
+    DTFile.Reader.Scanner scanner = reader.createScanner();
+
+    /* Read first key in the file */
+    long numBlocks = CacheManager.getCacheSize();
+    scanner.lowerBound(key);
+    Assert.assertEquals("Cache contains some blocks ", numBlocks + 1, CacheManager.getCacheSize());
+
+    /* Next key does not add a new block in cache, it reads directly from cache */
+    // close scanner, so that it does not use its own cache.
+    scanner.close();
+    ikey++;
+    bb.clear();
+    bb.putLong(ikey);
+
+    numBlocks = CacheManager.getCacheSize();
+    long hit = CacheManager.getCache().stats().hitCount();
+    scanner.lowerBound(key);
+    Assert.assertEquals("Cache contains some blocks ", CacheManager.getCacheSize(), numBlocks);
+    Assert.assertEquals("Cache hit ", CacheManager.getCache().stats().hitCount(), hit+1);
+
+    /* test cache miss */
+    scanner.close();
+    hit = CacheManager.getCache().stats().hitCount();
+    long oldmiss = CacheManager.getCache().stats().missCount();
+    ikey = tuples-1;
+    bb.clear();
+    bb.putLong(ikey);
+    numBlocks = CacheManager.getCacheSize();
+    scanner.lowerBound(key);
+    Assert.assertEquals("Cache contains one more blocks ", CacheManager.getCacheSize(), numBlocks + 1);
+    Assert.assertEquals("No cache hit ", CacheManager.getCache().stats().hitCount(), hit);
+    Assert.assertEquals("Cache miss", CacheManager.getCache().stats().missCount(), oldmiss + 1);
+
+    Assert.assertEquals("Reverse lookup cache and block cache has same number of entries",
+        reader.readerBCF.getCacheKeys().size(), CacheManager.getCacheSize());
+    reader.close();
+    Assert.assertEquals("Cache blocks are deleted on reader close ", CacheManager.getCacheSize(), 0);
+    Assert.assertEquals("Size of reverse lookup cache is zero ", 0, reader.readerBCF.getCacheKeys().size());
+  }
+
+  @Test
+  public void checkInvalidKeys()
+  {
+    /* invalidating non existing key do not throw exception */
+    List<String> lst = new LinkedList<String>();
+    lst.add("One");
+    lst.add("Two");
+    CacheManager.getCache().invalidateAll(lst);
+  }
+
+  @Before
+  public void createDTfile() throws IOException
+  {
+    setUp();
+    writeTFile();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
new file mode 100644
index 0000000..49fedeb
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Writer;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+
+/**
+ * test tfile features.
+ * 
+ */
+public class TestDTFile extends TestCase {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+  private FileSystem fs;
+  private Configuration conf;
+  private static final int minBlockSize = 512;
+  private static final int largeVal = 3 * 1024 * 1024;
+  private static final String localFormatter = "%010d";
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    // do nothing
+  }
+
+  // read a key from the scanner
+  public byte[] readKey(Scanner scanner) throws IOException {
+    int keylen = scanner.entry().getKeyLength();
+    byte[] read = new byte[keylen];
+    scanner.entry().getKey(read);
+    return read;
+  }
+
+  // read a value from the scanner
+  public byte[] readValue(Scanner scanner) throws IOException {
+    int valueLen = scanner.entry().getValueLength();
+    byte[] read = new byte[valueLen];
+    scanner.entry().getValue(read);
+    return read;
+  }
+
+  // read a long value from the scanner
+  public byte[] readLongValue(Scanner scanner, int len) throws IOException {
+    DataInputStream din = scanner.entry().getValueStream();
+    byte[] b = new byte[len];
+    din.readFully(b);
+    din.close();
+    return b;
+  }
+
+  // write some records into the tfile
+  // write them twice
+  private int writeSomeRecords(Writer writer, int start, int n)
+      throws IOException {
+    String value = "value";
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      writer.append(key.getBytes(), (value + key).getBytes());
+      writer.append(key.getBytes(), (value + key).getBytes());
+    }
+    return (start + n);
+  }
+
+  // read the records and check
+  private int readAndCheckbytes(Scanner scanner, int start, int n)
+      throws IOException {
+    String value = "value";
+    for (int i = start; i < (start + n); i++) {
+      byte[] key = readKey(scanner);
+      byte[] val = readValue(scanner);
+      String keyStr = String.format(localFormatter, i);
+      String valStr = value + keyStr;
+      assertTrue("btyes for keys do not match " + keyStr + " "
+          + new String(key), Arrays.equals(keyStr.getBytes(), key));
+      assertTrue("bytes for vals do not match " + valStr + " "
+          + new String(val), Arrays.equals(
+          valStr.getBytes(), val));
+      assertTrue(scanner.advance());
+      key = readKey(scanner);
+      val = readValue(scanner);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      assertTrue("bytes for vals do not match", Arrays.equals(
+          valStr.getBytes(), val));
+      assertTrue(scanner.advance());
+    }
+    return (start + n);
+  }
+
+  // write some large records
+  // write them twice
+  private int writeLargeRecords(Writer writer, int start, int n)
+      throws IOException {
+    byte[] value = new byte[largeVal];
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      writer.append(key.getBytes(), value);
+      writer.append(key.getBytes(), value);
+    }
+    return (start + n);
+  }
+
+  // read large records
+  // read them twice since its duplicated
+  private int readLargeRecords(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      byte[] key = readKey(scanner);
+      String keyStr = String.format(localFormatter, i);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      scanner.advance();
+      key = readKey(scanner);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  // write empty keys and values
+  private void writeEmptyRecords(Writer writer, int n) throws IOException {
+    byte[] key = new byte[0];
+    byte[] value = new byte[0];
+    for (int i = 0; i < n; i++) {
+      writer.append(key, value);
+    }
+  }
+
+  // read empty keys and values
+  private void readEmptyRecords(Scanner scanner, int n) throws IOException {
+    byte[] key = new byte[0];
+    byte[] value = new byte[0];
+    byte[] readKey = null;
+    byte[] readValue = null;
+    for (int i = 0; i < n; i++) {
+      readKey = readKey(scanner);
+      readValue = readValue(scanner);
+      assertTrue("failed to match keys", Arrays.equals(readKey, key));
+      assertTrue("failed to match values", Arrays.equals(readValue, value));
+      assertTrue("failed to advance cursor", scanner.advance());
+    }
+  }
+
+  private int writePrepWithKnownLength(Writer writer, int start, int n)
+      throws IOException {
+    // get the length of the key
+    String key = String.format(localFormatter, start);
+    int keyLen = key.getBytes().length;
+    String value = "value" + key;
+    int valueLen = value.getBytes().length;
+    for (int i = start; i < (start + n); i++) {
+      DataOutputStream out = writer.prepareAppendKey(keyLen);
+      String localKey = String.format(localFormatter, i);
+      out.write(localKey.getBytes());
+      out.close();
+      out = writer.prepareAppendValue(valueLen);
+      String localValue = "value" + localKey;
+      out.write(localValue.getBytes());
+      out.close();
+    }
+    return (start + n);
+  }
+
+  private int readPrepWithKnownLength(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      byte[] read = readKey(scanner);
+      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+      String value = "value" + key;
+      read = readValue(scanner);
+      assertTrue("values not equal", Arrays.equals(value.getBytes(), read));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  private int writePrepWithUnkownLength(Writer writer, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      DataOutputStream out = writer.prepareAppendKey(-1);
+      String localKey = String.format(localFormatter, i);
+      out.write(localKey.getBytes());
+      out.close();
+      String value = "value" + localKey;
+      out = writer.prepareAppendValue(-1);
+      out.write(value.getBytes());
+      out.close();
+    }
+    return (start + n);
+  }
+
+  private int readPrepWithUnknownLength(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < start; i++) {
+      String key = String.format(localFormatter, i);
+      byte[] read = readKey(scanner);
+      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+      try {
+        read = readValue(scanner);
+        assertTrue(false);
+      }
+      catch (IOException ie) {
+        // should have thrown exception
+      }
+      String value = "value" + key;
+      read = readLongValue(scanner, value.getBytes().length);
+      assertTrue("values nto equal", Arrays.equals(read, value.getBytes()));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  private byte[] getSomeKey(int rowId) {
+    return String.format(localFormatter, rowId).getBytes();
+  }
+
+  private void writeRecords(Writer writer) throws IOException {
+    writeEmptyRecords(writer, 10);
+    int ret = writeSomeRecords(writer, 0, 100);
+    ret = writeLargeRecords(writer, ret, 1);
+    ret = writePrepWithKnownLength(writer, ret, 40);
+    ret = writePrepWithUnkownLength(writer, ret, 50);
+    writer.close();
+  }
+
+  private void readAllRecords(Scanner scanner) throws IOException {
+    readEmptyRecords(scanner, 10);
+    int ret = readAndCheckbytes(scanner, 0, 100);
+    ret = readLargeRecords(scanner, ret, 1);
+    ret = readPrepWithKnownLength(scanner, ret, 40);
+    ret = readPrepWithUnknownLength(scanner, ret, 50);
+  }
+
+  private FSDataOutputStream createFSOutput(Path name) throws IOException {
+    if (fs.exists(name)) fs.delete(name, true);
+    FSDataOutputStream fout = fs.create(name);
+    return fout;
+  }
+
+  /**
+   * test none codecs
+   */
+  void basicWithSomeCodec(String codec) throws IOException {
+    Path ncTFile = new Path(ROOT, "basic.tfile");
+    FSDataOutputStream fout = createFSOutput(ncTFile);
+    Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf);
+    writeRecords(writer);
+    fout.close();
+    FSDataInputStream fin = fs.open(ncTFile);
+    Reader reader =
+        new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf);
+
+    Scanner scanner = reader.createScanner();
+    readAllRecords(scanner);
+    scanner.seekTo(getSomeKey(50));
+    assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50)));
+    // read the key and see if it matches
+    byte[] readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50),
+        readKey));
+
+    scanner.seekTo(new byte[0]);
+    byte[] val1 = readValue(scanner);
+    scanner.seekTo(new byte[0]);
+    byte[] val2 = readValue(scanner);
+    assertTrue(Arrays.equals(val1, val2));
+    
+    // check for lowerBound
+    scanner.lowerBound(getSomeKey(50));
+    assertTrue("locaton lookup failed", scanner.currentLocation
+        .compareTo(reader.end()) < 0);
+    readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(readKey,
+        getSomeKey(50)));
+
+    // check for upper bound
+    scanner.upperBound(getSomeKey(50));
+    assertTrue("location lookup failed", scanner.currentLocation
+        .compareTo(reader.end()) < 0);
+    readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(readKey,
+        getSomeKey(51)));
+
+    scanner.close();
+    // test for a range of scanner
+    scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
+    readAndCheckbytes(scanner, 10, 50);
+    assertFalse(scanner.advance());
+    scanner.close();
+    reader.close();
+    fin.close();
+    fs.delete(ncTFile, true);
+  }
+
+  // unsorted with some codec
+  void unsortedWithSomeCodec(String codec) throws IOException {
+    Path uTfile = new Path(ROOT, "unsorted.tfile");
+    FSDataOutputStream fout = createFSOutput(uTfile);
+    Writer writer = new Writer(fout, minBlockSize, codec, null, conf);
+    writeRecords(writer);
+    writer.close();
+    fout.close();
+    FSDataInputStream fin = fs.open(uTfile);
+    Reader reader =
+        new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf);
+
+    Scanner scanner = reader.createScanner();
+    readAllRecords(scanner);
+    scanner.close();
+    reader.close();
+    fin.close();
+    fs.delete(uTfile, true);
+  }
+
+  public void testTFileFeatures() throws IOException {
+    basicWithSomeCodec("none");
+    basicWithSomeCodec("gz");
+  }
+
+  // test unsorted t files.
+  public void testUnsortedTFileFeatures() throws IOException {
+    unsortedWithSomeCodec("none");
+    unsortedWithSomeCodec("gz");
+  }
+
+  private void writeNumMetablocks(Writer writer, String compression, int n)
+      throws IOException {
+    for (int i = 0; i < n; i++) {
+      DataOutputStream dout =
+          writer.prepareMetaBlock("TfileMeta" + i, compression);
+      byte[] b = ("something to test" + i).getBytes();
+      dout.write(b);
+      dout.close();
+    }
+  }
+
+  private void someTestingWithMetaBlock(Writer writer, String compression)
+      throws IOException {
+    DataOutputStream dout = null;
+    writeNumMetablocks(writer, compression, 10);
+    try {
+      dout = writer.prepareMetaBlock("TfileMeta1", compression);
+      assertTrue(false);
+    }
+    catch (MetaBlockAlreadyExists me) {
+      // avoid this exception
+    }
+    dout = writer.prepareMetaBlock("TFileMeta100", compression);
+    dout.close();
+  }
+
+  private void readNumMetablocks(Reader reader, int n) throws IOException {
+    int len = ("something to test" + 0).getBytes().length;
+    for (int i = 0; i < n; i++) {
+      DataInputStream din = reader.getMetaBlock("TfileMeta" + i);
+      byte b[] = new byte[len];
+      din.readFully(b);
+      assertTrue("faield to match metadata", Arrays.equals(
+          ("something to test" + i).getBytes(), b));
+      din.close();
+    }
+  }
+
+  private void someReadingWithMetaBlock(Reader reader) throws IOException {
+    DataInputStream din = null;
+    readNumMetablocks(reader, 10);
+    try {
+      din = reader.getMetaBlock("NO ONE");
+      assertTrue(false);
+    }
+    catch (MetaBlockDoesNotExist me) {
+      // should catch
+    }
+    din = reader.getMetaBlock("TFileMeta100");
+    int read = din.read();
+    assertTrue("check for status", (read == -1));
+    din.close();
+  }
+
+  // test meta blocks for tfiles
+  public void _testMetaBlocks() throws IOException {
+    Path mFile = new Path(ROOT, "meta.tfile");
+    FSDataOutputStream fout = createFSOutput(mFile);
+    Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
+    someTestingWithMetaBlock(writer, "none");
+    writer.close();
+    fout.close();
+    FSDataInputStream fin = fs.open(mFile);
+    Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
+    someReadingWithMetaBlock(reader);
+    fs.delete(mFile, true);
+    reader.close();
+    fin.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
new file mode 100644
index 0000000..a1fa5c8
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Writer;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Location;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+public class TestDTFileByteArrays {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+  private final static int BLOCK_SIZE = 512;
+  private final static int BUF_SIZE = 64;
+  private final static int K = 1024;
+  protected boolean skip = false;
+
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+
+  private FileSystem fs;
+  private Configuration conf = new Configuration();
+  private Path path;
+  private FSDataOutputStream out;
+  private Writer writer;
+
+  private String compression = Compression.Algorithm.GZ.getName();
+  private String comparator = "memcmp";
+  private final String outputFile = getClass().getSimpleName();
+
+  /*
+   * pre-sampled numbers of records in one block, based on the given the
+   * generated key and value strings. This is slightly different based on
+   * whether or not the native libs are present.
+   */
+  private boolean usingNative = ZlibFactory.isNativeZlibLoaded(conf);
+  private int records1stBlock = usingNative ? 5674 : 4480;
+  private int records2ndBlock = usingNative ? 5574 : 4263;
+
+  public void init(String compression, String comparator,
+      int numRecords1stBlock, int numRecords2ndBlock) {
+    init(compression, comparator);
+    this.records1stBlock = numRecords1stBlock;
+    this.records2ndBlock = numRecords2ndBlock;
+  }
+  
+  public void init(String compression, String comparator) {
+    this.compression = compression;
+    this.comparator = comparator;
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    path = new Path(ROOT, outputFile);
+    fs = path.getFileSystem(conf);
+    out = fs.create(path);
+    writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (!skip)
+      fs.delete(path, true);
+  }
+
+  @Test
+  public void testNoDataEntry() throws IOException {
+    if (skip) 
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Assert.assertTrue(reader.isSorted());
+    Scanner scanner = reader.createScanner();
+    Assert.assertTrue(scanner.atEnd());
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  public void testOneDataEntry() throws IOException {
+    if (skip)
+      return;
+    writeRecords(1);
+    readRecords(1);
+
+    checkBlockIndex(0, 0);
+    readValueBeforeKey(0);
+    readKeyWithoutValue(0);
+    readValueWithoutKey(0);
+    readKeyManyTimes(0);
+  }
+
+  @Test
+  public void testTwoDataEntries() throws IOException {
+    if (skip)
+      return;
+    writeRecords(2);
+    readRecords(2);
+  }
+
+  /**
+   * Fill up exactly one block.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testOneBlock() throws IOException {
+    if (skip)
+      return;
+    // just under one block
+    writeRecords(records1stBlock);
+    readRecords(records1stBlock);
+    // last key should be in the first block (block 0)
+    checkBlockIndex(records1stBlock - 1, 0);
+  }
+
+  /**
+   * One block plus one record.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testOneBlockPlusOneEntry() throws IOException {
+    if (skip)
+      return;
+    writeRecords(records1stBlock + 1);
+    readRecords(records1stBlock + 1);
+    checkBlockIndex(records1stBlock - 1, 0);
+    checkBlockIndex(records1stBlock, 1);
+  }
+
+  @Test
+  public void testTwoBlocks() throws IOException {
+    if (skip)
+      return;
+    writeRecords(records1stBlock + 5);
+    readRecords(records1stBlock + 5);
+    checkBlockIndex(records1stBlock + 4, 1);
+  }
+
+  @Test
+  public void testThreeBlocks() throws IOException {
+    if (skip) 
+      return;
+    writeRecords(2 * records1stBlock + 5);
+    readRecords(2 * records1stBlock + 5);
+
+    checkBlockIndex(2 * records1stBlock + 4, 2);
+    // 1st key in file
+    readValueBeforeKey(0);
+    readKeyWithoutValue(0);
+    readValueWithoutKey(0);
+    readKeyManyTimes(0);
+    // last key in file
+    readValueBeforeKey(2 * records1stBlock + 4);
+    readKeyWithoutValue(2 * records1stBlock + 4);
+    readValueWithoutKey(2 * records1stBlock + 4);
+    readKeyManyTimes(2 * records1stBlock + 4);
+
+    // 1st key in mid block, verify block indexes then read
+    checkBlockIndex(records1stBlock - 1, 0);
+    checkBlockIndex(records1stBlock, 1);
+    readValueBeforeKey(records1stBlock);
+    readKeyWithoutValue(records1stBlock);
+    readValueWithoutKey(records1stBlock);
+    readKeyManyTimes(records1stBlock);
+
+    // last key in mid block, verify block indexes then read
+    checkBlockIndex(records1stBlock + records2ndBlock
+        - 1, 1);
+    checkBlockIndex(records1stBlock + records2ndBlock, 2);
+    readValueBeforeKey(records1stBlock
+        + records2ndBlock - 1);
+    readKeyWithoutValue(records1stBlock
+        + records2ndBlock - 1);
+    readValueWithoutKey(records1stBlock
+        + records2ndBlock - 1);
+    readKeyManyTimes(records1stBlock + records2ndBlock
+        - 1);
+
+    // mid in mid block
+    readValueBeforeKey(records1stBlock + 10);
+    readKeyWithoutValue(records1stBlock + 10);
+    readValueWithoutKey(records1stBlock + 10);
+    readKeyManyTimes(records1stBlock + 10);
+  }
+
+  Location locate(Scanner scanner, byte[] key) throws IOException {
+    if (scanner.seekTo(key) == true) {
+      return scanner.currentLocation;
+    }
+    return scanner.endLocation;
+  }
+  
+  @Test
+  public void testLocate() throws IOException {
+    if (skip)
+      return;
+    writeRecords(3 * records1stBlock);
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    locate(scanner, composeSortedKey(KEY, 2).getBytes());
+    locate(scanner, composeSortedKey(KEY, records1stBlock - 1).getBytes());
+    locate(scanner, composeSortedKey(KEY, records1stBlock).getBytes());
+    Location locX = locate(scanner, "keyX".getBytes());
+    Assert.assertEquals(scanner.endLocation, locX);
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  public void testFailureWriterNotClosed() throws IOException {
+    if (skip)
+      return;
+    Reader reader = null;
+    try {
+      reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Cannot read before closing the writer.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  @Test
+  public void testFailureWriteMetaBlocksWithSameName() throws IOException {
+    if (skip)
+      return;
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("foo".getBytes());
+    outMeta.close();
+    // add the same metablock
+    try {
+      writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+      Assert.fail("Cannot create metablocks with the same name.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureGetNonExistentMetaBlock() throws IOException {
+    if (skip)
+      return;
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("foo".getBytes());
+    outMeta.close();
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    DataInputStream mb = reader.getMetaBlock("testX");
+    Assert.assertNotNull(mb);
+    mb.close();
+    try {
+      DataInputStream mbBad = reader.getMetaBlock("testY");
+      Assert.fail("Error on handling non-existent metablocks.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testFailureWriteRecordAfterMetaBlock() throws IOException {
+    if (skip)
+      return;
+    // write a key/value first
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("dummy".getBytes());
+    outMeta.close();
+    // add more key/value
+    try {
+      writer.append("keyY".getBytes(), "valueY".getBytes());
+      Assert.fail("Cannot add key/value after start adding meta blocks.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureReadValueManyTimes() throws IOException {
+    if (skip)
+      return;
+    writeRecords(5);
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+
+    byte[] vbuf = new byte[BUF_SIZE];
+    int vlen = scanner.entry().getValueLength();
+    scanner.entry().getValue(vbuf);
+    Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
+    try {
+      scanner.entry().getValue(vbuf);
+      Assert.fail("Cannot get the value mlutiple times.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  public void testFailureBadCompressionCodec() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    out = fs.create(path);
+    try {
+      writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
+      Assert.fail("Error on handling invalid compression codecs.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+      // e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testFailureOpenEmptyFile() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    // create an absolutely empty file
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    out = fs.create(path);
+    out.close();
+    try {
+      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Error on handling empty files.");
+    } catch (EOFException e) {
+      // noop, expecting exceptions
+    }
+  }
+
+  @Test
+  public void testFailureOpenRandomFile() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    // create an random file
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    out = fs.create(path);
+    Random rand = new Random();
+    byte[] buf = new byte[K];
+    // fill with > 1MB data
+    for (int nx = 0; nx < K + 2; nx++) {
+      rand.nextBytes(buf);
+      out.write(buf);
+    }
+    out.close();
+    try {
+      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Error on handling random files.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    }
+  }
+
+  @Test
+  public void testFailureKeyLongerThan64K() throws IOException {
+    if (skip)
+      return;
+    byte[] buf = new byte[64 * K + 1];
+    Random rand = new Random();
+    rand.nextBytes(buf);
+    try {
+      writer.append(buf, "valueX".getBytes());
+    } catch (IndexOutOfBoundsException e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureOutOfOrderKeys() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyM".getBytes(), "valueM".getBytes());
+      writer.append("keyA".getBytes(), "valueA".getBytes());
+      Assert.fail("Error on handling out of order keys.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+      // e.printStackTrace();
+    }
+
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureNegativeOffset() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
+      Assert.fail("Error on handling negative offset.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureNegativeOffset_2() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      scanner.lowerBound("keyX".getBytes(), -1, 4);
+      Assert.fail("Error on handling negative offset.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    } finally {
+      reader.close();
+      scanner.close();
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureNegativeLength() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
+      Assert.fail("Error on handling negative length.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureNegativeLength_2() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      scanner.lowerBound("keyX".getBytes(), 0, -1);
+      Assert.fail("Error on handling negative length.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    } finally {
+      scanner.close();
+      reader.close();
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureNegativeLength_3() throws IOException {
+    if (skip)
+      return;
+    writeRecords(3);
+
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      // test negative array offset
+      try {
+        scanner.seekTo("keyY".getBytes(), -1, 4);
+        Assert.fail("Failed to handle negative offset.");
+      } catch (Exception e) {
+        // noop, expecting exceptions
+      }
+
+      // test negative array length
+      try {
+        scanner.seekTo("keyY".getBytes(), 0, -2);
+        Assert.fail("Failed to handle negative key length.");
+      } catch (Exception e) {
+        // noop, expecting exceptions
+      }
+    } finally {
+      reader.close();
+      scanner.close();
+    }
+  }
+
+  @Test
+  public void testFailureCompressionNotWorking() throws IOException {
+    if (skip)
+      return;
+    long rawDataSize = writeRecords(10 * records1stBlock, false);
+    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
+      Assert.assertTrue(out.getPos() < rawDataSize);
+    }
+    closeOutput();
+  }
+
+  @Test
+  public void testFailureFileWriteNotAt0Position() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    out = fs.create(path);
+    out.write(123);
+
+    try {
+      writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+      Assert.fail("Failed to catch file write not at position 0.");
+    } catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  private long writeRecords(int count) throws IOException {
+    return writeRecords(count, true);
+  }
+
+  private long writeRecords(int count, boolean close) throws IOException {
+    long rawDataSize = writeRecords(writer, count);
+    if (close) {
+      closeOutput();
+    }
+    return rawDataSize;
+  }
+
+  static long writeRecords(Writer writer, int count) throws IOException {
+    long rawDataSize = 0;
+    int nx;
+    for (nx = 0; nx < count; nx++) {
+      byte[] key = composeSortedKey(KEY, nx).getBytes();
+      byte[] value = (VALUE + nx).getBytes();
+      writer.append(key, value);
+      rawDataSize +=
+          WritableUtils.getVIntSize(key.length) + key.length
+              + WritableUtils.getVIntSize(value.length) + value.length;
+    }
+    return rawDataSize;
+  }
+
+  /**
+   * Insert some leading 0's in front of the value, to make the keys sorted.
+   * 
+   * @param prefix prefix
+   * @param value  value
+   * @return sorted key
+   */
+  static String composeSortedKey(String prefix, int value) {
+    return String.format("%s%010d", prefix, value);
+  }
+
+  private void readRecords(int count) throws IOException {
+    readRecords(fs, path, count, conf);
+  }
+
+  static void readRecords(FileSystem fs, Path path, int count,
+      Configuration conf) throws IOException {
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+
+    try {
+      for (int nx = 0; nx < count; nx++, scanner.advance()) {
+        Assert.assertFalse(scanner.atEnd());
+        // Assert.assertTrue(scanner.next());
+
+        byte[] kbuf = new byte[BUF_SIZE];
+        int klen = scanner.entry().getKeyLength();
+        scanner.entry().getKey(kbuf);
+        Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+            nx));
+
+        byte[] vbuf = new byte[BUF_SIZE];
+        int vlen = scanner.entry().getValueLength();
+        scanner.entry().getValue(vbuf);
+        Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
+      }
+
+      Assert.assertTrue(scanner.atEnd());
+      Assert.assertFalse(scanner.advance());
+    } finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void checkBlockIndex(int recordIndex, int blockIndexExpected) throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    scanner.seekTo(composeSortedKey(KEY, recordIndex).getBytes());
+    Assert.assertEquals(blockIndexExpected, scanner.currentLocation
+        .getBlockIndex());
+    scanner.close();
+    reader.close();
+  }
+
+  private void readValueBeforeKey(int recordIndex)
+      throws IOException {
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner =
+        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
+            .getBytes(), null);
+
+    try {
+      byte[] vbuf = new byte[BUF_SIZE];
+      int vlen = scanner.entry().getValueLength();
+      scanner.entry().getValue(vbuf);
+      Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
+
+      byte[] kbuf = new byte[BUF_SIZE];
+      int klen = scanner.entry().getKeyLength();
+      scanner.entry().getKey(kbuf);
+      Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+          recordIndex));
+    } finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void readKeyWithoutValue(int recordIndex)
+      throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner =
+        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
+            .getBytes(), null);
+
+    try {
+      // read the indexed key
+      byte[] kbuf1 = new byte[BUF_SIZE];
+      int klen1 = scanner.entry().getKeyLength();
+      scanner.entry().getKey(kbuf1);
+      Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+          recordIndex));
+
+      if (scanner.advance() && !scanner.atEnd()) {
+        // read the next key following the indexed
+        byte[] kbuf2 = new byte[BUF_SIZE];
+        int klen2 = scanner.entry().getKeyLength();
+        scanner.entry().getKey(kbuf2);
+        Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
+            recordIndex + 1));
+      }
+    } finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void readValueWithoutKey(int recordIndex)
+      throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+    Scanner scanner =
+        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
+            .getBytes(), null);
+
+    byte[] vbuf1 = new byte[BUF_SIZE];
+    int vlen1 = scanner.entry().getValueLength();
+    scanner.entry().getValue(vbuf1);
+    Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
+
+    if (scanner.advance() && !scanner.atEnd()) {
+      byte[] vbuf2 = new byte[BUF_SIZE];
+      int vlen2 = scanner.entry().getValueLength();
+      scanner.entry().getValue(vbuf2);
+      Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
+          + (recordIndex + 1));
+    }
+
+    scanner.close();
+    reader.close();
+  }
+
+  private void readKeyManyTimes(int recordIndex) throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+    Scanner scanner =
+        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
+            .getBytes(), null);
+
+    // read the indexed key
+    byte[] kbuf1 = new byte[BUF_SIZE];
+    int klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        recordIndex));
+
+    klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        recordIndex));
+
+    klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        recordIndex));
+
+    scanner.close();
+    reader.close();
+  }
+
+  private void closeOutput() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+    if (out != null) {
+      out.close();
+      out = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
new file mode 100644
index 0000000..c313813
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.file.tfile.DTFile.Writer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestTFileComparator2 {
+  private static final String ROOT = System.getProperty("test.build.data",
+      "/tmp/tfile-test");
+  private static final String name = "test-tfile-comparator2";
+  private final static int BLOCK_SIZE = 512;
+  private static final String VALUE = "value";
+  private static final String jClassLongWritableComparator = "jclass:"
+      + LongWritable.Comparator.class.getName();
+  private static final long NENTRY = 10000;
+
+  private static long cube(long n) {
+    return n*n*n;
+  }
+  
+  private static String buildValue(long i) {
+    return String.format("%s-%d", VALUE, i);
+  }
+  
+  @Test
+  public void testSortedLongWritable() throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path(ROOT, name);
+    FileSystem fs = path.getFileSystem(conf);
+    FSDataOutputStream out = fs.create(path);
+    try {
+    DTFile.Writer writer = new Writer(out, BLOCK_SIZE, "gz",
+        jClassLongWritableComparator, conf);
+      try {
+        LongWritable key = new LongWritable(0);
+        for (long i=0; i<NENTRY; ++i) {
+          key.set(cube(i-NENTRY/2));
+          DataOutputStream dos = writer.prepareAppendKey(-1);
+          try {
+            key.write(dos);
+          } finally {
+            dos.close();
+          }
+          dos = writer.prepareAppendValue(-1);
+          try {
+            dos.write(buildValue(i).getBytes());
+          } finally {
+            dos.close();
+          }
+        }
+      } finally {
+        writer.close();
+      } 
+    } finally {
+      out.close();
+    }
+    
+    FSDataInputStream in = fs.open(path);
+    try {
+      DTFile.Reader reader = new DTFile.Reader(in, fs.getFileStatus(path)
+          .getLen(), conf);
+      try {
+        DTFile.Reader.Scanner scanner = reader.createScanner();
+        long i=0;
+        BytesWritable value = new BytesWritable();
+        for (; !scanner.atEnd(); scanner.advance()) {
+          scanner.entry().getValue(value);
+          assertEquals(buildValue(i), new String(value.getBytes(), 0, value
+              .getLength()));
+          ++i;
+        }
+      } finally {
+        reader.close();
+      }
+    } finally {
+      in.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
new file mode 100644
index 0000000..0a10468
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.DTFile.Writer;
+import org.junit.Assert;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+public class TestTFileComparators extends TestCase {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+
+  private final static int BLOCK_SIZE = 512;
+  private FileSystem fs;
+  private Configuration conf;
+  private Path path;
+  private FSDataOutputStream out;
+  private Writer writer;
+
+  private String compression = Compression.Algorithm.GZ.getName();
+  private String outputFile = "TFileTestComparators";
+  /*
+   * pre-sampled numbers of records in one block, based on the given the
+   * generated key and value strings
+   */
+  // private int records1stBlock = 4314;
+  // private int records2ndBlock = 4108;
+  private int records1stBlock = 4480;
+  private int records2ndBlock = 4263;
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    path = new Path(ROOT, outputFile);
+    fs = path.getFileSystem(conf);
+    out = fs.create(path);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    fs.delete(path, true);
+  }
+
+  // bad comparator format
+  public void testFailureBadComparatorNames() throws IOException {
+    try {
+      writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  // jclass that doesn't exist
+  public void testFailureBadJClassNames() throws IOException {
+    try {
+      writer =
+          new Writer(out, BLOCK_SIZE, compression,
+              "jclass: some.non.existence.clazz", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  // class exists but not a RawComparator
+  public void testFailureBadJClasses() throws IOException {
+    try {
+      writer =
+          new Writer(out, BLOCK_SIZE, compression,
+              "jclass:org.apache.hadoop.io.file.tfile.Chunk", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  private void closeOutput() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+    if (out != null) {
+      out.close();
+      out = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
new file mode 100644
index 0000000..301cffc
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+
+public class TestTFileJClassComparatorByteArrays extends TestDTFileByteArrays {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.GZ.getName(),
+        "jclass: org.apache.hadoop.io.file.tfile.MyComparator");
+    super.setUp();
+  }
+}
+
+class MyComparator implements RawComparator<byte[]>, Serializable {
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+  }
+
+  @Override
+  public int compare(byte[] o1, byte[] o2) {
+    return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
new file mode 100644
index 0000000..20cff9e
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsByteArrays extends TestDTFileByteArrays {
+  /**
+   * Test LZO compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    skip = !(Algorithm.LZO.isSupported());
+    if (skip) {
+      System.out.println("Skipped");
+    }
+
+    // TODO: sample the generated key/value records, and put the numbers below
+    init(Compression.Algorithm.LZO.getName(), "memcmp", 2605, 2558);
+    if (!skip)
+      super.setUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
new file mode 100644
index 0000000..7c6581d
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsStreams extends TestTFileStreams {
+  /**
+   * Test LZO compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    skip = !(Algorithm.LZO.isSupported());
+    if (skip) {
+      System.out.println("Skipped");
+    }
+    init(Compression.Algorithm.LZO.getName(), "memcmp");
+    if (!skip) 
+      super.setUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
new file mode 100644
index 0000000..c304743
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+public class TestTFileNoneCodecsByteArrays extends TestDTFileByteArrays {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.NONE.getName(), "memcmp", 24, 24);
+    super.setUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
new file mode 100644
index 0000000..31e3cad
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+
+public class TestTFileNoneCodecsJClassComparatorByteArrays extends TestDTFileByteArrays {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.NONE.getName(),
+        "jclass: org.apache.hadoop.io.file.tfile.MyComparator", 24, 24);
+    super.setUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
new file mode 100644
index 0000000..06d086b
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+public class TestTFileNoneCodecsStreams extends TestTFileStreams {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.NONE.getName(), "memcmp");
+    super.setUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
new file mode 100644
index 0000000..9f6b3ce
--- /dev/null
+++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Writer;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+
+/**
+ * test the performance for seek.
+ *
+ */
+public class TestTFileSeek extends TestCase { 
+  private MyOptions options;
+  private Configuration conf;
+  private Path path;
+  private FileSystem fs;
+  private NanoTimer timer;
+  private Random rng;
+  private DiscreteRNG keyLenGen;
+  private KVGenerator kvGen;
+
+  @Override
+  public void setUp() throws IOException {
+    if (options == null) {
+      options = new MyOptions(new String[0]);
+    }
+
+    conf = new Configuration();
+    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
+    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
+    path = new Path(new Path(options.rootDir), options.file);
+    fs = path.getFileSystem(conf);
+    timer = new NanoTimer(false);
+    rng = new Random(options.seed);
+    keyLenGen =
+        new RandomDistribution.Zipf(new Random(rng.nextLong()),
+            options.minKeyLen, options.maxKeyLen, 1.2);
+    DiscreteRNG valLenGen =
+        new RandomDistribution.Flat(new Random(rng.nextLong()),
+            options.minValLength, options.maxValLength);
+    DiscreteRNG wordLenGen =
+        new RandomDistribution.Flat(new Random(rng.nextLong()),
+            options.minWordLen, options.maxWordLen);
+    kvGen =
+        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
+            options.dictSize);
+  }
+  
+  @Override
+  public void tearDown() throws IOException {
+    fs.delete(path, true);
+  }
+  
+  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
+    throws IOException {
+    if (fs.exists(name)) {
+      fs.delete(name, true);
+    }
+    FSDataOutputStream fout = fs.create(name);
+    return fout;
+  }
+
+  private void createTFile() throws IOException {
+    long totalBytes = 0;
+    FSDataOutputStream fout = createFSOutput(path, fs);
+    try {
+      Writer writer =
+          new Writer(fout, options.minBlockSize, options.compress, "memcmp",
+              conf);
+      try {
+        BytesWritable key = new BytesWritable();
+        BytesWritable val = new BytesWritable();
+        timer.start();
+        for (long i = 0; true; ++i) {
+          if (i % 1000 == 0) { // test the size for every 1000 rows.
+            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
+              break;
+            }
+          }
+          kvGen.next(key, val, false);
+          writer.append(key.get(), 0, key.getSize(), val.get(), 0, val
+              .getSize());
+          totalBytes += key.getSize();
+          totalBytes += val.getSize();
+        }
+        timer.stop();
+      }
+      finally {
+        writer.close();
+      }
+    }
+    finally {
+      fout.close();
+    }
+    double duration = (double)timer.read()/1000; // in us.
+    long fsize = fs.getFileStatus(path).getLen();
+
+    System.out.printf(
+        "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
+        timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
+            / duration);
+    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
+        timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
+  }
+  
+  public void seekTFile() throws IOException {
+    int miss = 0;
+    long totalBytes = 0;
+    FSDataInputStream fsdis = fs.open(path);
+    Reader reader =
+      new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
+    KeySampler kSampler =
+        new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
+            keyLenGen);
+    Scanner scanner = reader.createScanner();
+    BytesWritable key = new BytesWritable();
+    BytesWritable val = new BytesWritable();
+    timer.reset();
+    timer.start();
+    for (int i = 0; i < options.seekCount; ++i) {
+      kSampler.next(key);
+      scanner.lowerBound(key.get(), 0, key.getSize());
+      if (!scanner.atEnd()) {
+        scanner.entry().get(key, val);
+        totalBytes += key.getSize();
+        totalBytes += val.getSize();
+      }
+      else {
+        ++miss;
+      }
+    }
+    timer.stop();
+    double duration = (double) timer.read() / 1000; // in us.
+    System.out.printf(
+        "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
+        timer.toString(), NanoTimer.nanoTimeToString(timer.read()
+            / options.seekCount), options.seekCount - miss, miss,
+        (double) totalBytes / 1024 / (options.seekCount - miss));
+
+  }
+  
+  public void testSeeks() throws IOException {
+    String[] supported = TFile.getSupportedCompressionAlgorithms();
+    boolean proceed = false;
+    for (String c : supported) {
+      if (c.equals(options.compress)) {
+        proceed = true;
+        break;
+      }
+    }
+
+    if (!proceed) {
+      System.out.println("Skipped for " + options.compress);
+      return;
+    }
+
+    if (options.doCreate()) {
+      createTFile();
+    }
+
+    if (options.doRead()) {
+      seekTFile();
+    }
+  }
+  
+  private static class IntegerRange {
+    private final int from, to;
+
+    public IntegerRange(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    public static IntegerRange parse(String s) throws ParseException {
+      StringTokenizer st = new StringTokenizer(s, " \t,");
+      if (st.countTokens() != 2) {
+        throw new ParseException("Bad integer specification: " + s);
+      }
+      int from = Integer.parseInt(st.nextToken());
+      int to = Integer.parseInt(st.nextToken());
+      return new IntegerRange(from, to);
+    }
+
+    public int from() {
+      return from;
+    }
+
+    public int to() {
+      return to;
+    }
+  }
+
+  private static class MyOptions {
+    // hard coded constants
+    int dictSize = 1000;
+    int minWordLen = 5;
+    int maxWordLen = 20;
+    int osInputBufferSize = 64 * 1024;
+    int osOutputBufferSize = 64 * 1024;
+    int fsInputBufferSizeNone = 0;
+    int fsInputBufferSizeLzo = 0;
+    int fsInputBufferSizeGz = 0;
+    int fsOutputBufferSizeNone = 1;
+    int fsOutputBufferSizeLzo = 1;
+    int fsOutputBufferSizeGz = 1;
+   
+    String rootDir =
+        System.getProperty("test.build.data", "/tmp/tfile-test");
+    String file = "TestTFileSeek";
+    String compress = "gz";
+    int minKeyLen = 10;
+    int maxKeyLen = 50;
+    int minValLength = 100;
+    int maxValLength = 200;
+    int minBlockSize = 64 * 1024;
+    int fsOutputBufferSize = 1;
+    int fsInputBufferSize = 0;
+    long fileSize = 3 * 1024 * 1024;
+    long seekCount = 1000;
+    long seed;
+
+    static final int OP_CREATE = 1;
+    static final int OP_READ = 2;
+    int op = OP_CREATE | OP_READ;
+
+    boolean proceed = false;
+
+    public MyOptions(String[] args) {
+      seed = System.nanoTime();
+
+      try {
+        Options opts = buildOptions();
+        CommandLineParser parser = new GnuParser();
+        CommandLine line = parser.parse(opts, args, true);
+        processOptions(line, opts);
+        validateOptions();
+      }
+      catch (ParseException e) {
+        System.out.println(e.getMessage());
+        System.out.println("Try \"--help\" option for details.");
+        setStopProceed();
+      }
+    }
+
+    public boolean proceed() {
+      return proceed;
+    }
+
+    private Options buildOptions() {
+      Option compress =
+          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
+              .hasArg().withDescription("compression scheme").create('c');
+
+      Option fileSize =
+          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
+              .hasArg().withDescription("target size of the file (in MB).")
+              .create('s');
+
+      Option fsInputBufferSz =
+          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
+              .hasArg().withDescription(
+                  "size of the file system input buffer (in bytes).").create(
+                  'i');
+
+      Option fsOutputBufferSize =
+          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
+              .hasArg().withDescription(
+                  "size of the file system output buffer (in bytes).").create(
+                  'o');
+
+      Option keyLen =
+          OptionBuilder
+              .withLongOpt("key-length")
+              .withArgName("min,max")
+              .hasArg()
+              .withDescription(
+                  "the length range of the key (in bytes)")
+              .create('k');
+
+      Option valueLen =
+          OptionBuilder
+              .withLongOpt("value-length")
+              .withArgName("min,max")
+              .hasArg()
+              .withDescription(
+                  "the length range of the value (in bytes)")
+              .create('v');
+
+      Option blockSz =
+          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
+              .withDescription("minimum block size (in KB)").create('b');
+
+      Option seed =
+          OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
+              .withDescription("specify the seed").create('S');
+
+      Option operation =
+          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
+              .withDescription(
+                  "action: seek-only, create-only, seek-after-create").create(
+                  'x');
+
+      Option rootDir =
+          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
+              .withDescription(
+                  "specify root directory where files will be created.")
+              .create('r');
+
+      Option file =
+          OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
+              .withDescription("specify the file name to be created or read.")
+              .create('f');
+
+      Option seekCount =
+          OptionBuilder
+              .withLongOpt("seek")
+              .withArgName("count")
+              .hasArg()
+              .withDescription(
+                  "specify how many seek operations we perform (requires -x r or -x rw.")
+              .create('n');
+
+      Option help =
+          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
+              "show this screen").create("h");
+
+      return new Options().addOption(compress).addOption(fileSize).addOption(
+          fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
+          .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption(
+              operation).addOption(seekCount).addOption(file).addOption(help);
+
+    }
+
+    private void processOptions(CommandLine line, Options opts)
+        throws ParseException {
+      // --help -h and --version -V must be processed first.
+      if (line.hasOption('h')) {
+        HelpFormatter formatter = new HelpFormatter();
+        System.out.println("TFile and SeqFile benchmark.");
+        System.out.println();
+        formatter.printHelp(100,
+            "java ... TestTFileSeqFileComparison [options]",
+            "\nSupported options:", opts, "");
+        return;
+      }
+
+      if (line.hasOption('c')) {
+        compress = line.getOptionValue('c');
+      }
+
+      if (line.hasOption('d')) {
+        dictSize = Integer.parseInt(line.getOptionValue('d'));
+      }
+
+      if (line.hasOption('s')) {
+        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
+      }
+
+      if (line.hasOption('i')) {
+        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
+      }
+
+      if (line.hasOption('o')) {
+        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
+      }
+      
+      if (line.hasOption('n')) {
+        seekCount = Integer.parseInt(line.getOptionValue('n'));
+      }
+
+      if (line.hasOption('k')) {
+        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
+        minKeyLen = ir.from();
+        maxKeyLen = ir.to();
+      }
+
+      if (line.hasOption('v')) {
+        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
+        minValLength = ir.from();
+        maxValLength = ir.to();
+      }
+
+      if (line.hasOption('b')) {
+        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
+      }
+
+      if (line.hasOption('r')) {
+        rootDir = line.getOptionValue('r');
+      }
+      
+      if (line.hasOption('f')) {
+        file = line.getOptionValue('f');
+      }
+
+      if (line.hasOption('S')) {
+        seed = Long.parseLong(line.getOptionValue('S'));
+      }
+
+      if (line.hasOption('x')) {
+        String strOp = line.getOptionValue('x');
+        if (strOp.equals("r")) {
+          op = OP_READ;
+        }
+        else if (strOp.equals("w")) {
+          op = OP_CREATE;
+        }
+        else if (strOp.equals("rw")) {
+          op = OP_CREATE | OP_READ;
+        }
+        else {
+          throw new ParseException("Unknown action specifier: " + strOp);
+        }
+      }
+
+      proceed = true;
+    }
+
+    private void validateOptions() throws ParseException {
+      if (!compress.equals("none") && !compress.equals("lzo")
+          && !compress.equals("gz")) {
+        throw new ParseException("Unknown compression scheme: " + compress);
+      }
+
+      if (minKeyLen >= maxKeyLen) {
+        throw new ParseException(
+            "Max key length must be greater than min key length.");
+      }
+
+      if (minValLength >= maxValLength) {
+        throw new ParseException(
+            "Max value length must be greater than min value length.");
+      }
+
+      if (minWordLen >= maxWordLen) {
+        throw new ParseException(
+            "Max word length must be greater than min word length.");
+      }
+      return;
+    }
+
+    private void setStopProceed() {
+      proceed = false;
+    }
+
+    public boolean doCreate() {
+      return (op & OP_CREATE) != 0;
+    }
+
+    public boolean doRead() {
+      return (op & OP_READ) != 0;
+    }
+  }
+  
+  public static void main(String[] argv) throws IOException {
+    TestTFileSeek testCase = new TestTFileSeek();
+    MyOptions options = new MyOptions(argv);
+    
+    if (options.proceed == false) {
+      return;
+    }
+
+    testCase.options = options;
+    testCase.setUp();
+    testCase.testSeeks();
+    testCase.tearDown();
+  }
+}