You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:05 UTC

[6/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
deleted file mode 100644
index 37bff4b..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import org.junit.Assert;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-public class DTFileTest
-{
-  private Configuration conf;
-  private Path path;
-  private FileSystem fs;
-  private NanoTimer timer;
-  private Random rng;
-  private RandomDistribution.DiscreteRNG keyLenGen;
-  private KVGenerator kvGen;
-
-
-  static class TestConf {
-    public int minWordLen = 5;
-    public int maxWordLen = 20;
-    public int dictSize = 1000;
-    int minKeyLen = 10;
-    int maxKeyLen = 50;
-    int minValLength = 100;
-    int maxValLength = 200;
-    int minBlockSize = 64 * 1024;
-    int fsOutputBufferSize = 1;
-    int fsInputBufferSize = 256 * 1024;
-    long fileSize = 3 * 1024 * 1024;
-    long seekCount = 1000;
-    String compress = "gz";
-
-  }
-
-  TestConf tconf = new TestConf();
-
-  public void setUp() throws IOException
-  {
-    conf = new Configuration();
-
-    conf.setInt("tfile.fs.input.buffer.size", tconf.fsInputBufferSize);
-    conf.setInt("tfile.fs.output.buffer.size", tconf.fsOutputBufferSize);
-    path = new Path("tmp/dtfile");
-    fs = path.getFileSystem(conf);
-    timer = new NanoTimer(false);
-    rng = new Random();
-    keyLenGen =
-        new RandomDistribution.Zipf(new Random(rng.nextLong()),
-            tconf.minKeyLen, tconf.maxKeyLen, 1.2);
-    RandomDistribution.DiscreteRNG valLenGen =
-        new RandomDistribution.Flat(new Random(rng.nextLong()),
-            tconf.minValLength, tconf.maxValLength);
-    RandomDistribution.DiscreteRNG wordLenGen =
-        new RandomDistribution.Flat(new Random(rng.nextLong()),
-            tconf.minWordLen, tconf.maxWordLen);
-    kvGen =
-        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
-            tconf.dictSize);
-  }
-
-
-  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
-      throws IOException {
-    if (fs.exists(name)) {
-      fs.delete(name, true);
-    }
-    FSDataOutputStream fout = fs.create(name);
-    return fout;
-  }
-
-  int tuples = 0;
-
-  private void writeTFile() throws IOException
-  {
-
-    FSDataOutputStream fout = createFSOutput(path, fs);
-    byte[] key = new byte[16];
-    ByteBuffer bb = ByteBuffer.wrap(key);
-    try {
-      DTFile.Writer writer =
-          new DTFile.Writer(fout, tconf.minBlockSize, tconf.compress, "memcmp",
-              conf);
-      try {
-        BytesWritable tmpKey = new BytesWritable();
-        BytesWritable val = new BytesWritable();
-        for (long i = 0; true; ++i) {
-          if (i % 1000 == 0) { // test the size for every 1000 rows.
-            if (fs.getFileStatus(path).getLen() >= tconf.fileSize) {
-              break;
-            }
-          }
-          bb.clear();
-          bb.putLong(i);
-          kvGen.next(tmpKey, val, false);
-          writer.append(key, 0, key.length, val.get(), 0, val
-              .getSize());
-          tuples++;
-        }
-      }
-      finally {
-        writer.close();
-      }
-    }
-    finally {
-      fout.close();
-    }
-
-    long fsize = fs.getFileStatus(path).getLen();
-
-    System.out.println("Total tuple wrote " + tuples + " File size " + fsize / (1024.0 * 1024));
-  }
-
-
-
-  @Test
-  public void seekDTFile() throws IOException
-  {
-    Random random = new Random();
-    int ikey = random.nextInt(tuples);
-    byte[] key = new byte[16];
-    ByteBuffer bb = ByteBuffer.wrap(key);
-    bb.putLong(ikey);
-
-    FSDataInputStream fsdis = fs.open(path);
-
-    CacheManager.setEnableStats(true);
-    Assert.assertEquals("Cache Contains no block", CacheManager.getCacheSize(), 0);
-
-    DTFile.Reader reader = new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
-    DTFile.Reader.Scanner scanner = reader.createScanner();
-
-    /* Read first key in the file */
-    long numBlocks = CacheManager.getCacheSize();
-    scanner.lowerBound(key);
-    Assert.assertEquals("Cache contains some blocks ", numBlocks + 1, CacheManager.getCacheSize());
-
-    /* Next key does not add a new block in cache, it reads directly from cache */
-    // close scanner, so that it does not use its own cache.
-    scanner.close();
-    ikey++;
-    bb.clear();
-    bb.putLong(ikey);
-
-    numBlocks = CacheManager.getCacheSize();
-    long hit = CacheManager.getCache().stats().hitCount();
-    scanner.lowerBound(key);
-    Assert.assertEquals("Cache contains some blocks ", CacheManager.getCacheSize(), numBlocks);
-    Assert.assertEquals("Cache hit ", CacheManager.getCache().stats().hitCount(), hit+1);
-
-    /* test cache miss */
-    scanner.close();
-    hit = CacheManager.getCache().stats().hitCount();
-    long oldmiss = CacheManager.getCache().stats().missCount();
-    ikey = tuples-1;
-    bb.clear();
-    bb.putLong(ikey);
-    numBlocks = CacheManager.getCacheSize();
-    scanner.lowerBound(key);
-    Assert.assertEquals("Cache contains one more blocks ", CacheManager.getCacheSize(), numBlocks + 1);
-    Assert.assertEquals("No cache hit ", CacheManager.getCache().stats().hitCount(), hit);
-    Assert.assertEquals("Cache miss", CacheManager.getCache().stats().missCount(), oldmiss + 1);
-
-    Assert.assertEquals("Reverse lookup cache and block cache has same number of entries",
-        reader.readerBCF.getCacheKeys().size(), CacheManager.getCacheSize());
-    reader.close();
-    Assert.assertEquals("Cache blocks are deleted on reader close ", CacheManager.getCacheSize(), 0);
-    Assert.assertEquals("Size of reverse lookup cache is zero ", 0, reader.readerBCF.getCacheKeys().size());
-  }
-
-  @Test
-  public void checkInvalidKeys()
-  {
-    /* invalidating non existing key do not throw exception */
-    List<String> lst = new LinkedList<String>();
-    lst.add("One");
-    lst.add("Two");
-    CacheManager.getCache().invalidateAll(lst);
-  }
-
-  @Before
-  public void createDTfile() throws IOException
-  {
-    setUp();
-    writeTFile();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
deleted file mode 100644
index 49fedeb..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-
-/**
- * test tfile features.
- * 
- */
-public class TestDTFile extends TestCase {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-  private FileSystem fs;
-  private Configuration conf;
-  private static final int minBlockSize = 512;
-  private static final int largeVal = 3 * 1024 * 1024;
-  private static final String localFormatter = "%010d";
-
-  @Override
-  public void setUp() throws IOException {
-    conf = new Configuration();
-    fs = FileSystem.get(conf);
-  }
-
-  @Override
-  public void tearDown() throws IOException {
-    // do nothing
-  }
-
-  // read a key from the scanner
-  public byte[] readKey(Scanner scanner) throws IOException {
-    int keylen = scanner.entry().getKeyLength();
-    byte[] read = new byte[keylen];
-    scanner.entry().getKey(read);
-    return read;
-  }
-
-  // read a value from the scanner
-  public byte[] readValue(Scanner scanner) throws IOException {
-    int valueLen = scanner.entry().getValueLength();
-    byte[] read = new byte[valueLen];
-    scanner.entry().getValue(read);
-    return read;
-  }
-
-  // read a long value from the scanner
-  public byte[] readLongValue(Scanner scanner, int len) throws IOException {
-    DataInputStream din = scanner.entry().getValueStream();
-    byte[] b = new byte[len];
-    din.readFully(b);
-    din.close();
-    return b;
-  }
-
-  // write some records into the tfile
-  // write them twice
-  private int writeSomeRecords(Writer writer, int start, int n)
-      throws IOException {
-    String value = "value";
-    for (int i = start; i < (start + n); i++) {
-      String key = String.format(localFormatter, i);
-      writer.append(key.getBytes(), (value + key).getBytes());
-      writer.append(key.getBytes(), (value + key).getBytes());
-    }
-    return (start + n);
-  }
-
-  // read the records and check
-  private int readAndCheckbytes(Scanner scanner, int start, int n)
-      throws IOException {
-    String value = "value";
-    for (int i = start; i < (start + n); i++) {
-      byte[] key = readKey(scanner);
-      byte[] val = readValue(scanner);
-      String keyStr = String.format(localFormatter, i);
-      String valStr = value + keyStr;
-      assertTrue("btyes for keys do not match " + keyStr + " "
-          + new String(key), Arrays.equals(keyStr.getBytes(), key));
-      assertTrue("bytes for vals do not match " + valStr + " "
-          + new String(val), Arrays.equals(
-          valStr.getBytes(), val));
-      assertTrue(scanner.advance());
-      key = readKey(scanner);
-      val = readValue(scanner);
-      assertTrue("btyes for keys do not match", Arrays.equals(
-          keyStr.getBytes(), key));
-      assertTrue("bytes for vals do not match", Arrays.equals(
-          valStr.getBytes(), val));
-      assertTrue(scanner.advance());
-    }
-    return (start + n);
-  }
-
-  // write some large records
-  // write them twice
-  private int writeLargeRecords(Writer writer, int start, int n)
-      throws IOException {
-    byte[] value = new byte[largeVal];
-    for (int i = start; i < (start + n); i++) {
-      String key = String.format(localFormatter, i);
-      writer.append(key.getBytes(), value);
-      writer.append(key.getBytes(), value);
-    }
-    return (start + n);
-  }
-
-  // read large records
-  // read them twice since its duplicated
-  private int readLargeRecords(Scanner scanner, int start, int n)
-      throws IOException {
-    for (int i = start; i < (start + n); i++) {
-      byte[] key = readKey(scanner);
-      String keyStr = String.format(localFormatter, i);
-      assertTrue("btyes for keys do not match", Arrays.equals(
-          keyStr.getBytes(), key));
-      scanner.advance();
-      key = readKey(scanner);
-      assertTrue("btyes for keys do not match", Arrays.equals(
-          keyStr.getBytes(), key));
-      scanner.advance();
-    }
-    return (start + n);
-  }
-
-  // write empty keys and values
-  private void writeEmptyRecords(Writer writer, int n) throws IOException {
-    byte[] key = new byte[0];
-    byte[] value = new byte[0];
-    for (int i = 0; i < n; i++) {
-      writer.append(key, value);
-    }
-  }
-
-  // read empty keys and values
-  private void readEmptyRecords(Scanner scanner, int n) throws IOException {
-    byte[] key = new byte[0];
-    byte[] value = new byte[0];
-    byte[] readKey = null;
-    byte[] readValue = null;
-    for (int i = 0; i < n; i++) {
-      readKey = readKey(scanner);
-      readValue = readValue(scanner);
-      assertTrue("failed to match keys", Arrays.equals(readKey, key));
-      assertTrue("failed to match values", Arrays.equals(readValue, value));
-      assertTrue("failed to advance cursor", scanner.advance());
-    }
-  }
-
-  private int writePrepWithKnownLength(Writer writer, int start, int n)
-      throws IOException {
-    // get the length of the key
-    String key = String.format(localFormatter, start);
-    int keyLen = key.getBytes().length;
-    String value = "value" + key;
-    int valueLen = value.getBytes().length;
-    for (int i = start; i < (start + n); i++) {
-      DataOutputStream out = writer.prepareAppendKey(keyLen);
-      String localKey = String.format(localFormatter, i);
-      out.write(localKey.getBytes());
-      out.close();
-      out = writer.prepareAppendValue(valueLen);
-      String localValue = "value" + localKey;
-      out.write(localValue.getBytes());
-      out.close();
-    }
-    return (start + n);
-  }
-
-  private int readPrepWithKnownLength(Scanner scanner, int start, int n)
-      throws IOException {
-    for (int i = start; i < (start + n); i++) {
-      String key = String.format(localFormatter, i);
-      byte[] read = readKey(scanner);
-      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
-      String value = "value" + key;
-      read = readValue(scanner);
-      assertTrue("values not equal", Arrays.equals(value.getBytes(), read));
-      scanner.advance();
-    }
-    return (start + n);
-  }
-
-  private int writePrepWithUnkownLength(Writer writer, int start, int n)
-      throws IOException {
-    for (int i = start; i < (start + n); i++) {
-      DataOutputStream out = writer.prepareAppendKey(-1);
-      String localKey = String.format(localFormatter, i);
-      out.write(localKey.getBytes());
-      out.close();
-      String value = "value" + localKey;
-      out = writer.prepareAppendValue(-1);
-      out.write(value.getBytes());
-      out.close();
-    }
-    return (start + n);
-  }
-
-  private int readPrepWithUnknownLength(Scanner scanner, int start, int n)
-      throws IOException {
-    for (int i = start; i < start; i++) {
-      String key = String.format(localFormatter, i);
-      byte[] read = readKey(scanner);
-      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
-      try {
-        read = readValue(scanner);
-        assertTrue(false);
-      }
-      catch (IOException ie) {
-        // should have thrown exception
-      }
-      String value = "value" + key;
-      read = readLongValue(scanner, value.getBytes().length);
-      assertTrue("values nto equal", Arrays.equals(read, value.getBytes()));
-      scanner.advance();
-    }
-    return (start + n);
-  }
-
-  private byte[] getSomeKey(int rowId) {
-    return String.format(localFormatter, rowId).getBytes();
-  }
-
-  private void writeRecords(Writer writer) throws IOException {
-    writeEmptyRecords(writer, 10);
-    int ret = writeSomeRecords(writer, 0, 100);
-    ret = writeLargeRecords(writer, ret, 1);
-    ret = writePrepWithKnownLength(writer, ret, 40);
-    ret = writePrepWithUnkownLength(writer, ret, 50);
-    writer.close();
-  }
-
-  private void readAllRecords(Scanner scanner) throws IOException {
-    readEmptyRecords(scanner, 10);
-    int ret = readAndCheckbytes(scanner, 0, 100);
-    ret = readLargeRecords(scanner, ret, 1);
-    ret = readPrepWithKnownLength(scanner, ret, 40);
-    ret = readPrepWithUnknownLength(scanner, ret, 50);
-  }
-
-  private FSDataOutputStream createFSOutput(Path name) throws IOException {
-    if (fs.exists(name)) fs.delete(name, true);
-    FSDataOutputStream fout = fs.create(name);
-    return fout;
-  }
-
-  /**
-   * test none codecs
-   */
-  void basicWithSomeCodec(String codec) throws IOException {
-    Path ncTFile = new Path(ROOT, "basic.tfile");
-    FSDataOutputStream fout = createFSOutput(ncTFile);
-    Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf);
-    writeRecords(writer);
-    fout.close();
-    FSDataInputStream fin = fs.open(ncTFile);
-    Reader reader =
-        new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf);
-
-    Scanner scanner = reader.createScanner();
-    readAllRecords(scanner);
-    scanner.seekTo(getSomeKey(50));
-    assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50)));
-    // read the key and see if it matches
-    byte[] readKey = readKey(scanner);
-    assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50),
-        readKey));
-
-    scanner.seekTo(new byte[0]);
-    byte[] val1 = readValue(scanner);
-    scanner.seekTo(new byte[0]);
-    byte[] val2 = readValue(scanner);
-    assertTrue(Arrays.equals(val1, val2));
-    
-    // check for lowerBound
-    scanner.lowerBound(getSomeKey(50));
-    assertTrue("locaton lookup failed", scanner.currentLocation
-        .compareTo(reader.end()) < 0);
-    readKey = readKey(scanner);
-    assertTrue("seeked key does not match", Arrays.equals(readKey,
-        getSomeKey(50)));
-
-    // check for upper bound
-    scanner.upperBound(getSomeKey(50));
-    assertTrue("location lookup failed", scanner.currentLocation
-        .compareTo(reader.end()) < 0);
-    readKey = readKey(scanner);
-    assertTrue("seeked key does not match", Arrays.equals(readKey,
-        getSomeKey(51)));
-
-    scanner.close();
-    // test for a range of scanner
-    scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
-    readAndCheckbytes(scanner, 10, 50);
-    assertFalse(scanner.advance());
-    scanner.close();
-    reader.close();
-    fin.close();
-    fs.delete(ncTFile, true);
-  }
-
-  // unsorted with some codec
-  void unsortedWithSomeCodec(String codec) throws IOException {
-    Path uTfile = new Path(ROOT, "unsorted.tfile");
-    FSDataOutputStream fout = createFSOutput(uTfile);
-    Writer writer = new Writer(fout, minBlockSize, codec, null, conf);
-    writeRecords(writer);
-    writer.close();
-    fout.close();
-    FSDataInputStream fin = fs.open(uTfile);
-    Reader reader =
-        new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf);
-
-    Scanner scanner = reader.createScanner();
-    readAllRecords(scanner);
-    scanner.close();
-    reader.close();
-    fin.close();
-    fs.delete(uTfile, true);
-  }
-
-  public void testTFileFeatures() throws IOException {
-    basicWithSomeCodec("none");
-    basicWithSomeCodec("gz");
-  }
-
-  // test unsorted t files.
-  public void testUnsortedTFileFeatures() throws IOException {
-    unsortedWithSomeCodec("none");
-    unsortedWithSomeCodec("gz");
-  }
-
-  private void writeNumMetablocks(Writer writer, String compression, int n)
-      throws IOException {
-    for (int i = 0; i < n; i++) {
-      DataOutputStream dout =
-          writer.prepareMetaBlock("TfileMeta" + i, compression);
-      byte[] b = ("something to test" + i).getBytes();
-      dout.write(b);
-      dout.close();
-    }
-  }
-
-  private void someTestingWithMetaBlock(Writer writer, String compression)
-      throws IOException {
-    DataOutputStream dout = null;
-    writeNumMetablocks(writer, compression, 10);
-    try {
-      dout = writer.prepareMetaBlock("TfileMeta1", compression);
-      assertTrue(false);
-    }
-    catch (MetaBlockAlreadyExists me) {
-      // avoid this exception
-    }
-    dout = writer.prepareMetaBlock("TFileMeta100", compression);
-    dout.close();
-  }
-
-  private void readNumMetablocks(Reader reader, int n) throws IOException {
-    int len = ("something to test" + 0).getBytes().length;
-    for (int i = 0; i < n; i++) {
-      DataInputStream din = reader.getMetaBlock("TfileMeta" + i);
-      byte b[] = new byte[len];
-      din.readFully(b);
-      assertTrue("faield to match metadata", Arrays.equals(
-          ("something to test" + i).getBytes(), b));
-      din.close();
-    }
-  }
-
-  private void someReadingWithMetaBlock(Reader reader) throws IOException {
-    DataInputStream din = null;
-    readNumMetablocks(reader, 10);
-    try {
-      din = reader.getMetaBlock("NO ONE");
-      assertTrue(false);
-    }
-    catch (MetaBlockDoesNotExist me) {
-      // should catch
-    }
-    din = reader.getMetaBlock("TFileMeta100");
-    int read = din.read();
-    assertTrue("check for status", (read == -1));
-    din.close();
-  }
-
-  // test meta blocks for tfiles
-  public void _testMetaBlocks() throws IOException {
-    Path mFile = new Path(ROOT, "meta.tfile");
-    FSDataOutputStream fout = createFSOutput(mFile);
-    Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
-    someTestingWithMetaBlock(writer, "none");
-    writer.close();
-    fout.close();
-    FSDataInputStream fin = fs.open(mFile);
-    Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
-    someReadingWithMetaBlock(reader);
-    fs.delete(mFile, true);
-    reader.close();
-    fin.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
deleted file mode 100644
index a1fa5c8..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Location;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * 
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- * 
- */
-public class TestDTFileByteArrays {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-  private final static int BLOCK_SIZE = 512;
-  private final static int BUF_SIZE = 64;
-  private final static int K = 1024;
-  protected boolean skip = false;
-
-  private static final String KEY = "key";
-  private static final String VALUE = "value";
-
-  private FileSystem fs;
-  private Configuration conf = new Configuration();
-  private Path path;
-  private FSDataOutputStream out;
-  private Writer writer;
-
-  private String compression = Compression.Algorithm.GZ.getName();
-  private String comparator = "memcmp";
-  private final String outputFile = getClass().getSimpleName();
-
-  /*
-   * pre-sampled numbers of records in one block, based on the given the
-   * generated key and value strings. This is slightly different based on
-   * whether or not the native libs are present.
-   */
-  private boolean usingNative = ZlibFactory.isNativeZlibLoaded(conf);
-  private int records1stBlock = usingNative ? 5674 : 4480;
-  private int records2ndBlock = usingNative ? 5574 : 4263;
-
-  public void init(String compression, String comparator,
-      int numRecords1stBlock, int numRecords2ndBlock) {
-    init(compression, comparator);
-    this.records1stBlock = numRecords1stBlock;
-    this.records2ndBlock = numRecords2ndBlock;
-  }
-  
-  public void init(String compression, String comparator) {
-    this.compression = compression;
-    this.comparator = comparator;
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    path = new Path(ROOT, outputFile);
-    fs = path.getFileSystem(conf);
-    out = fs.create(path);
-    writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (!skip)
-      fs.delete(path, true);
-  }
-
-  @Test
-  public void testNoDataEntry() throws IOException {
-    if (skip) 
-      return;
-    closeOutput();
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Assert.assertTrue(reader.isSorted());
-    Scanner scanner = reader.createScanner();
-    Assert.assertTrue(scanner.atEnd());
-    scanner.close();
-    reader.close();
-  }
-
-  @Test
-  public void testOneDataEntry() throws IOException {
-    if (skip)
-      return;
-    writeRecords(1);
-    readRecords(1);
-
-    checkBlockIndex(0, 0);
-    readValueBeforeKey(0);
-    readKeyWithoutValue(0);
-    readValueWithoutKey(0);
-    readKeyManyTimes(0);
-  }
-
-  @Test
-  public void testTwoDataEntries() throws IOException {
-    if (skip)
-      return;
-    writeRecords(2);
-    readRecords(2);
-  }
-
-  /**
-   * Fill up exactly one block.
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testOneBlock() throws IOException {
-    if (skip)
-      return;
-    // just under one block
-    writeRecords(records1stBlock);
-    readRecords(records1stBlock);
-    // last key should be in the first block (block 0)
-    checkBlockIndex(records1stBlock - 1, 0);
-  }
-
-  /**
-   * One block plus one record.
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testOneBlockPlusOneEntry() throws IOException {
-    if (skip)
-      return;
-    writeRecords(records1stBlock + 1);
-    readRecords(records1stBlock + 1);
-    checkBlockIndex(records1stBlock - 1, 0);
-    checkBlockIndex(records1stBlock, 1);
-  }
-
-  @Test
-  public void testTwoBlocks() throws IOException {
-    if (skip)
-      return;
-    writeRecords(records1stBlock + 5);
-    readRecords(records1stBlock + 5);
-    checkBlockIndex(records1stBlock + 4, 1);
-  }
-
-  @Test
-  public void testThreeBlocks() throws IOException {
-    if (skip) 
-      return;
-    writeRecords(2 * records1stBlock + 5);
-    readRecords(2 * records1stBlock + 5);
-
-    checkBlockIndex(2 * records1stBlock + 4, 2);
-    // 1st key in file
-    readValueBeforeKey(0);
-    readKeyWithoutValue(0);
-    readValueWithoutKey(0);
-    readKeyManyTimes(0);
-    // last key in file
-    readValueBeforeKey(2 * records1stBlock + 4);
-    readKeyWithoutValue(2 * records1stBlock + 4);
-    readValueWithoutKey(2 * records1stBlock + 4);
-    readKeyManyTimes(2 * records1stBlock + 4);
-
-    // 1st key in mid block, verify block indexes then read
-    checkBlockIndex(records1stBlock - 1, 0);
-    checkBlockIndex(records1stBlock, 1);
-    readValueBeforeKey(records1stBlock);
-    readKeyWithoutValue(records1stBlock);
-    readValueWithoutKey(records1stBlock);
-    readKeyManyTimes(records1stBlock);
-
-    // last key in mid block, verify block indexes then read
-    checkBlockIndex(records1stBlock + records2ndBlock
-        - 1, 1);
-    checkBlockIndex(records1stBlock + records2ndBlock, 2);
-    readValueBeforeKey(records1stBlock
-        + records2ndBlock - 1);
-    readKeyWithoutValue(records1stBlock
-        + records2ndBlock - 1);
-    readValueWithoutKey(records1stBlock
-        + records2ndBlock - 1);
-    readKeyManyTimes(records1stBlock + records2ndBlock
-        - 1);
-
-    // mid in mid block
-    readValueBeforeKey(records1stBlock + 10);
-    readKeyWithoutValue(records1stBlock + 10);
-    readValueWithoutKey(records1stBlock + 10);
-    readKeyManyTimes(records1stBlock + 10);
-  }
-
-  Location locate(Scanner scanner, byte[] key) throws IOException {
-    if (scanner.seekTo(key) == true) {
-      return scanner.currentLocation;
-    }
-    return scanner.endLocation;
-  }
-  
-  @Test
-  public void testLocate() throws IOException {
-    if (skip)
-      return;
-    writeRecords(3 * records1stBlock);
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    locate(scanner, composeSortedKey(KEY, 2).getBytes());
-    locate(scanner, composeSortedKey(KEY, records1stBlock - 1).getBytes());
-    locate(scanner, composeSortedKey(KEY, records1stBlock).getBytes());
-    Location locX = locate(scanner, "keyX".getBytes());
-    Assert.assertEquals(scanner.endLocation, locX);
-    scanner.close();
-    reader.close();
-  }
-
-  @Test
-  public void testFailureWriterNotClosed() throws IOException {
-    if (skip)
-      return;
-    Reader reader = null;
-    try {
-      reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-      Assert.fail("Cannot read before closing the writer.");
-    } catch (IOException e) {
-      // noop, expecting exceptions
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
-    }
-  }
-
-  @Test
-  public void testFailureWriteMetaBlocksWithSameName() throws IOException {
-    if (skip)
-      return;
-    writer.append("keyX".getBytes(), "valueX".getBytes());
-
-    // create a new metablock
-    DataOutputStream outMeta =
-        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
-    outMeta.write(123);
-    outMeta.write("foo".getBytes());
-    outMeta.close();
-    // add the same metablock
-    try {
-      writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
-      Assert.fail("Cannot create metablocks with the same name.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureGetNonExistentMetaBlock() throws IOException {
-    if (skip)
-      return;
-    writer.append("keyX".getBytes(), "valueX".getBytes());
-
-    // create a new metablock
-    DataOutputStream outMeta =
-        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
-    outMeta.write(123);
-    outMeta.write("foo".getBytes());
-    outMeta.close();
-    closeOutput();
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    DataInputStream mb = reader.getMetaBlock("testX");
-    Assert.assertNotNull(mb);
-    mb.close();
-    try {
-      DataInputStream mbBad = reader.getMetaBlock("testY");
-      Assert.fail("Error on handling non-existent metablocks.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    reader.close();
-  }
-
-  @Test
-  public void testFailureWriteRecordAfterMetaBlock() throws IOException {
-    if (skip)
-      return;
-    // write a key/value first
-    writer.append("keyX".getBytes(), "valueX".getBytes());
-    // create a new metablock
-    DataOutputStream outMeta =
-        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
-    outMeta.write(123);
-    outMeta.write("dummy".getBytes());
-    outMeta.close();
-    // add more key/value
-    try {
-      writer.append("keyY".getBytes(), "valueY".getBytes());
-      Assert.fail("Cannot add key/value after start adding meta blocks.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureReadValueManyTimes() throws IOException {
-    if (skip)
-      return;
-    writeRecords(5);
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-
-    byte[] vbuf = new byte[BUF_SIZE];
-    int vlen = scanner.entry().getValueLength();
-    scanner.entry().getValue(vbuf);
-    Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
-    try {
-      scanner.entry().getValue(vbuf);
-      Assert.fail("Cannot get the value mlutiple times.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-
-    scanner.close();
-    reader.close();
-  }
-
-  @Test
-  public void testFailureBadCompressionCodec() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-    out = fs.create(path);
-    try {
-      writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
-      Assert.fail("Error on handling invalid compression codecs.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-      // e.printStackTrace();
-    }
-  }
-
-  @Test
-  public void testFailureOpenEmptyFile() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-    // create an absolutely empty file
-    path = new Path(fs.getWorkingDirectory(), outputFile);
-    out = fs.create(path);
-    out.close();
-    try {
-      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-      Assert.fail("Error on handling empty files.");
-    } catch (EOFException e) {
-      // noop, expecting exceptions
-    }
-  }
-
-  @Test
-  public void testFailureOpenRandomFile() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-    // create an random file
-    path = new Path(fs.getWorkingDirectory(), outputFile);
-    out = fs.create(path);
-    Random rand = new Random();
-    byte[] buf = new byte[K];
-    // fill with > 1MB data
-    for (int nx = 0; nx < K + 2; nx++) {
-      rand.nextBytes(buf);
-      out.write(buf);
-    }
-    out.close();
-    try {
-      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-      Assert.fail("Error on handling random files.");
-    } catch (IOException e) {
-      // noop, expecting exceptions
-    }
-  }
-
-  @Test
-  public void testFailureKeyLongerThan64K() throws IOException {
-    if (skip)
-      return;
-    byte[] buf = new byte[64 * K + 1];
-    Random rand = new Random();
-    rand.nextBytes(buf);
-    try {
-      writer.append(buf, "valueX".getBytes());
-    } catch (IndexOutOfBoundsException e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureOutOfOrderKeys() throws IOException {
-    if (skip)
-      return;
-    try {
-      writer.append("keyM".getBytes(), "valueM".getBytes());
-      writer.append("keyA".getBytes(), "valueA".getBytes());
-      Assert.fail("Error on handling out of order keys.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-      // e.printStackTrace();
-    }
-
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureNegativeOffset() throws IOException {
-    if (skip)
-      return;
-    try {
-      writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
-      Assert.fail("Error on handling negative offset.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureNegativeOffset_2() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    try {
-      scanner.lowerBound("keyX".getBytes(), -1, 4);
-      Assert.fail("Error on handling negative offset.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    } finally {
-      reader.close();
-      scanner.close();
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureNegativeLength() throws IOException {
-    if (skip)
-      return;
-    try {
-      writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
-      Assert.fail("Error on handling negative length.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureNegativeLength_2() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    try {
-      scanner.lowerBound("keyX".getBytes(), 0, -1);
-      Assert.fail("Error on handling negative length.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    } finally {
-      scanner.close();
-      reader.close();
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureNegativeLength_3() throws IOException {
-    if (skip)
-      return;
-    writeRecords(3);
-
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    try {
-      // test negative array offset
-      try {
-        scanner.seekTo("keyY".getBytes(), -1, 4);
-        Assert.fail("Failed to handle negative offset.");
-      } catch (Exception e) {
-        // noop, expecting exceptions
-      }
-
-      // test negative array length
-      try {
-        scanner.seekTo("keyY".getBytes(), 0, -2);
-        Assert.fail("Failed to handle negative key length.");
-      } catch (Exception e) {
-        // noop, expecting exceptions
-      }
-    } finally {
-      reader.close();
-      scanner.close();
-    }
-  }
-
-  @Test
-  public void testFailureCompressionNotWorking() throws IOException {
-    if (skip)
-      return;
-    long rawDataSize = writeRecords(10 * records1stBlock, false);
-    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
-      Assert.assertTrue(out.getPos() < rawDataSize);
-    }
-    closeOutput();
-  }
-
-  @Test
-  public void testFailureFileWriteNotAt0Position() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-    out = fs.create(path);
-    out.write(123);
-
-    try {
-      writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
-      Assert.fail("Failed to catch file write not at position 0.");
-    } catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  private long writeRecords(int count) throws IOException {
-    return writeRecords(count, true);
-  }
-
-  private long writeRecords(int count, boolean close) throws IOException {
-    long rawDataSize = writeRecords(writer, count);
-    if (close) {
-      closeOutput();
-    }
-    return rawDataSize;
-  }
-
-  static long writeRecords(Writer writer, int count) throws IOException {
-    long rawDataSize = 0;
-    int nx;
-    for (nx = 0; nx < count; nx++) {
-      byte[] key = composeSortedKey(KEY, nx).getBytes();
-      byte[] value = (VALUE + nx).getBytes();
-      writer.append(key, value);
-      rawDataSize +=
-          WritableUtils.getVIntSize(key.length) + key.length
-              + WritableUtils.getVIntSize(value.length) + value.length;
-    }
-    return rawDataSize;
-  }
-
-  /**
-   * Insert some leading 0's in front of the value, to make the keys sorted.
-   * 
-   * @param prefix prefix
-   * @param value  value
-   * @return sorted key
-   */
-  static String composeSortedKey(String prefix, int value) {
-    return String.format("%s%010d", prefix, value);
-  }
-
-  private void readRecords(int count) throws IOException {
-    readRecords(fs, path, count, conf);
-  }
-
-  static void readRecords(FileSystem fs, Path path, int count,
-      Configuration conf) throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-
-    try {
-      for (int nx = 0; nx < count; nx++, scanner.advance()) {
-        Assert.assertFalse(scanner.atEnd());
-        // Assert.assertTrue(scanner.next());
-
-        byte[] kbuf = new byte[BUF_SIZE];
-        int klen = scanner.entry().getKeyLength();
-        scanner.entry().getKey(kbuf);
-        Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
-            nx));
-
-        byte[] vbuf = new byte[BUF_SIZE];
-        int vlen = scanner.entry().getValueLength();
-        scanner.entry().getValue(vbuf);
-        Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
-      }
-
-      Assert.assertTrue(scanner.atEnd());
-      Assert.assertFalse(scanner.advance());
-    } finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  private void checkBlockIndex(int recordIndex, int blockIndexExpected) throws IOException {
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    scanner.seekTo(composeSortedKey(KEY, recordIndex).getBytes());
-    Assert.assertEquals(blockIndexExpected, scanner.currentLocation
-        .getBlockIndex());
-    scanner.close();
-    reader.close();
-  }
-
-  private void readValueBeforeKey(int recordIndex)
-      throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner =
-        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
-            .getBytes(), null);
-
-    try {
-      byte[] vbuf = new byte[BUF_SIZE];
-      int vlen = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf);
-      Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
-
-      byte[] kbuf = new byte[BUF_SIZE];
-      int klen = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf);
-      Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
-          recordIndex));
-    } finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  private void readKeyWithoutValue(int recordIndex)
-      throws IOException {
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner =
-        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
-            .getBytes(), null);
-
-    try {
-      // read the indexed key
-      byte[] kbuf1 = new byte[BUF_SIZE];
-      int klen1 = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf1);
-      Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
-          recordIndex));
-
-      if (scanner.advance() && !scanner.atEnd()) {
-        // read the next key following the indexed
-        byte[] kbuf2 = new byte[BUF_SIZE];
-        int klen2 = scanner.entry().getKeyLength();
-        scanner.entry().getKey(kbuf2);
-        Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
-            recordIndex + 1));
-      }
-    } finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  private void readValueWithoutKey(int recordIndex)
-      throws IOException {
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-
-    Scanner scanner =
-        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
-            .getBytes(), null);
-
-    byte[] vbuf1 = new byte[BUF_SIZE];
-    int vlen1 = scanner.entry().getValueLength();
-    scanner.entry().getValue(vbuf1);
-    Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
-
-    if (scanner.advance() && !scanner.atEnd()) {
-      byte[] vbuf2 = new byte[BUF_SIZE];
-      int vlen2 = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf2);
-      Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
-          + (recordIndex + 1));
-    }
-
-    scanner.close();
-    reader.close();
-  }
-
-  private void readKeyManyTimes(int recordIndex) throws IOException {
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-
-    Scanner scanner =
-        reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
-            .getBytes(), null);
-
-    // read the indexed key
-    byte[] kbuf1 = new byte[BUF_SIZE];
-    int klen1 = scanner.entry().getKeyLength();
-    scanner.entry().getKey(kbuf1);
-    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
-        recordIndex));
-
-    klen1 = scanner.entry().getKeyLength();
-    scanner.entry().getKey(kbuf1);
-    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
-        recordIndex));
-
-    klen1 = scanner.entry().getKeyLength();
-    scanner.entry().getKey(kbuf1);
-    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
-        recordIndex));
-
-    scanner.close();
-    reader.close();
-  }
-
-  private void closeOutput() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-    }
-    if (out != null) {
-      out.close();
-      out = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
deleted file mode 100644
index c313813..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestTFileComparator2 {
-  private static final String ROOT = System.getProperty("test.build.data",
-      "/tmp/tfile-test");
-  private static final String name = "test-tfile-comparator2";
-  private final static int BLOCK_SIZE = 512;
-  private static final String VALUE = "value";
-  private static final String jClassLongWritableComparator = "jclass:"
-      + LongWritable.Comparator.class.getName();
-  private static final long NENTRY = 10000;
-
-  private static long cube(long n) {
-    return n*n*n;
-  }
-  
-  private static String buildValue(long i) {
-    return String.format("%s-%d", VALUE, i);
-  }
-  
-  @Test
-  public void testSortedLongWritable() throws IOException {
-    Configuration conf = new Configuration();
-    Path path = new Path(ROOT, name);
-    FileSystem fs = path.getFileSystem(conf);
-    FSDataOutputStream out = fs.create(path);
-    try {
-    DTFile.Writer writer = new Writer(out, BLOCK_SIZE, "gz",
-        jClassLongWritableComparator, conf);
-      try {
-        LongWritable key = new LongWritable(0);
-        for (long i=0; i<NENTRY; ++i) {
-          key.set(cube(i-NENTRY/2));
-          DataOutputStream dos = writer.prepareAppendKey(-1);
-          try {
-            key.write(dos);
-          } finally {
-            dos.close();
-          }
-          dos = writer.prepareAppendValue(-1);
-          try {
-            dos.write(buildValue(i).getBytes());
-          } finally {
-            dos.close();
-          }
-        }
-      } finally {
-        writer.close();
-      } 
-    } finally {
-      out.close();
-    }
-    
-    FSDataInputStream in = fs.open(path);
-    try {
-      DTFile.Reader reader = new DTFile.Reader(in, fs.getFileStatus(path)
-          .getLen(), conf);
-      try {
-        DTFile.Reader.Scanner scanner = reader.createScanner();
-        long i=0;
-        BytesWritable value = new BytesWritable();
-        for (; !scanner.atEnd(); scanner.advance()) {
-          scanner.entry().getValue(value);
-          assertEquals(buildValue(i), new String(value.getBytes(), 0, value
-              .getLength()));
-          ++i;
-        }
-      } finally {
-        reader.close();
-      }
-    } finally {
-      in.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
deleted file mode 100644
index 0a10468..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.junit.Assert;
-
-/**
- * 
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- * 
- */
-public class TestTFileComparators extends TestCase {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-
-  private final static int BLOCK_SIZE = 512;
-  private FileSystem fs;
-  private Configuration conf;
-  private Path path;
-  private FSDataOutputStream out;
-  private Writer writer;
-
-  private String compression = Compression.Algorithm.GZ.getName();
-  private String outputFile = "TFileTestComparators";
-  /*
-   * pre-sampled numbers of records in one block, based on the given the
-   * generated key and value strings
-   */
-  // private int records1stBlock = 4314;
-  // private int records2ndBlock = 4108;
-  private int records1stBlock = 4480;
-  private int records2ndBlock = 4263;
-
-  @Override
-  public void setUp() throws IOException {
-    conf = new Configuration();
-    path = new Path(ROOT, outputFile);
-    fs = path.getFileSystem(conf);
-    out = fs.create(path);
-  }
-
-  @Override
-  public void tearDown() throws IOException {
-    fs.delete(path, true);
-  }
-
-  // bad comparator format
-  public void testFailureBadComparatorNames() throws IOException {
-    try {
-      writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf);
-      Assert.fail("Failed to catch unsupported comparator names");
-    }
-    catch (Exception e) {
-      // noop, expecting exceptions
-      e.printStackTrace();
-    }
-  }
-
-  // jclass that doesn't exist
-  public void testFailureBadJClassNames() throws IOException {
-    try {
-      writer =
-          new Writer(out, BLOCK_SIZE, compression,
-              "jclass: some.non.existence.clazz", conf);
-      Assert.fail("Failed to catch unsupported comparator names");
-    }
-    catch (Exception e) {
-      // noop, expecting exceptions
-      e.printStackTrace();
-    }
-  }
-
-  // class exists but not a RawComparator
-  public void testFailureBadJClasses() throws IOException {
-    try {
-      writer =
-          new Writer(out, BLOCK_SIZE, compression,
-              "jclass:org.apache.hadoop.io.file.tfile.Chunk", conf);
-      Assert.fail("Failed to catch unsupported comparator names");
-    }
-    catch (Exception e) {
-      // noop, expecting exceptions
-      e.printStackTrace();
-    }
-  }
-
-  private void closeOutput() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-    }
-    if (out != null) {
-      out.close();
-      out = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
deleted file mode 100644
index 301cffc..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- * 
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- * 
- */
-
-public class TestTFileJClassComparatorByteArrays extends TestDTFileByteArrays {
-  /**
-   * Test non-compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    init(Compression.Algorithm.GZ.getName(),
-        "jclass: org.apache.hadoop.io.file.tfile.MyComparator");
-    super.setUp();
-  }
-}
-
-class MyComparator implements RawComparator<byte[]>, Serializable {
-
-  @Override
-  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
-  }
-
-  @Override
-  public int compare(byte[] o1, byte[] o2) {
-    return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
-  }
-  
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
deleted file mode 100644
index 20cff9e..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-public class TestTFileLzoCodecsByteArrays extends TestDTFileByteArrays {
-  /**
-   * Test LZO compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    skip = !(Algorithm.LZO.isSupported());
-    if (skip) {
-      System.out.println("Skipped");
-    }
-
-    // TODO: sample the generated key/value records, and put the numbers below
-    init(Compression.Algorithm.LZO.getName(), "memcmp", 2605, 2558);
-    if (!skip)
-      super.setUp();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
deleted file mode 100644
index 7c6581d..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-public class TestTFileLzoCodecsStreams extends TestTFileStreams {
-  /**
-   * Test LZO compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    skip = !(Algorithm.LZO.isSupported());
-    if (skip) {
-      System.out.println("Skipped");
-    }
-    init(Compression.Algorithm.LZO.getName(), "memcmp");
-    if (!skip) 
-      super.setUp();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
deleted file mode 100644
index c304743..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-public class TestTFileNoneCodecsByteArrays extends TestDTFileByteArrays {
-  /**
-   * Test non-compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    init(Compression.Algorithm.NONE.getName(), "memcmp", 24, 24);
-    super.setUp();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
deleted file mode 100644
index 31e3cad..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-/**
- * 
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- * 
- */
-
-public class TestTFileNoneCodecsJClassComparatorByteArrays extends TestDTFileByteArrays {
-  /**
-   * Test non-compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    init(Compression.Algorithm.NONE.getName(),
-        "jclass: org.apache.hadoop.io.file.tfile.MyComparator", 24, 24);
-    super.setUp();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
deleted file mode 100644
index 06d086b..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-public class TestTFileNoneCodecsStreams extends TestTFileStreams {
-  /**
-   * Test non-compression codec, using the same test cases as in the ByteArrays.
-   */
-  @Override
-  public void setUp() throws IOException {
-    init(Compression.Algorithm.NONE.getName(), "memcmp");
-    super.setUp();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
deleted file mode 100644
index 9f6b3ce..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-
-/**
- * test the performance for seek.
- *
- */
-public class TestTFileSeek extends TestCase { 
-  private MyOptions options;
-  private Configuration conf;
-  private Path path;
-  private FileSystem fs;
-  private NanoTimer timer;
-  private Random rng;
-  private DiscreteRNG keyLenGen;
-  private KVGenerator kvGen;
-
-  @Override
-  public void setUp() throws IOException {
-    if (options == null) {
-      options = new MyOptions(new String[0]);
-    }
-
-    conf = new Configuration();
-    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
-    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
-    path = new Path(new Path(options.rootDir), options.file);
-    fs = path.getFileSystem(conf);
-    timer = new NanoTimer(false);
-    rng = new Random(options.seed);
-    keyLenGen =
-        new RandomDistribution.Zipf(new Random(rng.nextLong()),
-            options.minKeyLen, options.maxKeyLen, 1.2);
-    DiscreteRNG valLenGen =
-        new RandomDistribution.Flat(new Random(rng.nextLong()),
-            options.minValLength, options.maxValLength);
-    DiscreteRNG wordLenGen =
-        new RandomDistribution.Flat(new Random(rng.nextLong()),
-            options.minWordLen, options.maxWordLen);
-    kvGen =
-        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
-            options.dictSize);
-  }
-  
-  @Override
-  public void tearDown() throws IOException {
-    fs.delete(path, true);
-  }
-  
-  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
-    throws IOException {
-    if (fs.exists(name)) {
-      fs.delete(name, true);
-    }
-    FSDataOutputStream fout = fs.create(name);
-    return fout;
-  }
-
-  private void createTFile() throws IOException {
-    long totalBytes = 0;
-    FSDataOutputStream fout = createFSOutput(path, fs);
-    try {
-      Writer writer =
-          new Writer(fout, options.minBlockSize, options.compress, "memcmp",
-              conf);
-      try {
-        BytesWritable key = new BytesWritable();
-        BytesWritable val = new BytesWritable();
-        timer.start();
-        for (long i = 0; true; ++i) {
-          if (i % 1000 == 0) { // test the size for every 1000 rows.
-            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
-              break;
-            }
-          }
-          kvGen.next(key, val, false);
-          writer.append(key.get(), 0, key.getSize(), val.get(), 0, val
-              .getSize());
-          totalBytes += key.getSize();
-          totalBytes += val.getSize();
-        }
-        timer.stop();
-      }
-      finally {
-        writer.close();
-      }
-    }
-    finally {
-      fout.close();
-    }
-    double duration = (double)timer.read()/1000; // in us.
-    long fsize = fs.getFileStatus(path).getLen();
-
-    System.out.printf(
-        "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
-        timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
-            / duration);
-    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
-        timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
-  }
-  
-  public void seekTFile() throws IOException {
-    int miss = 0;
-    long totalBytes = 0;
-    FSDataInputStream fsdis = fs.open(path);
-    Reader reader =
-      new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
-    KeySampler kSampler =
-        new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
-            keyLenGen);
-    Scanner scanner = reader.createScanner();
-    BytesWritable key = new BytesWritable();
-    BytesWritable val = new BytesWritable();
-    timer.reset();
-    timer.start();
-    for (int i = 0; i < options.seekCount; ++i) {
-      kSampler.next(key);
-      scanner.lowerBound(key.get(), 0, key.getSize());
-      if (!scanner.atEnd()) {
-        scanner.entry().get(key, val);
-        totalBytes += key.getSize();
-        totalBytes += val.getSize();
-      }
-      else {
-        ++miss;
-      }
-    }
-    timer.stop();
-    double duration = (double) timer.read() / 1000; // in us.
-    System.out.printf(
-        "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
-        timer.toString(), NanoTimer.nanoTimeToString(timer.read()
-            / options.seekCount), options.seekCount - miss, miss,
-        (double) totalBytes / 1024 / (options.seekCount - miss));
-
-  }
-  
-  public void testSeeks() throws IOException {
-    String[] supported = TFile.getSupportedCompressionAlgorithms();
-    boolean proceed = false;
-    for (String c : supported) {
-      if (c.equals(options.compress)) {
-        proceed = true;
-        break;
-      }
-    }
-
-    if (!proceed) {
-      System.out.println("Skipped for " + options.compress);
-      return;
-    }
-
-    if (options.doCreate()) {
-      createTFile();
-    }
-
-    if (options.doRead()) {
-      seekTFile();
-    }
-  }
-  
-  private static class IntegerRange {
-    private final int from, to;
-
-    public IntegerRange(int from, int to) {
-      this.from = from;
-      this.to = to;
-    }
-
-    public static IntegerRange parse(String s) throws ParseException {
-      StringTokenizer st = new StringTokenizer(s, " \t,");
-      if (st.countTokens() != 2) {
-        throw new ParseException("Bad integer specification: " + s);
-      }
-      int from = Integer.parseInt(st.nextToken());
-      int to = Integer.parseInt(st.nextToken());
-      return new IntegerRange(from, to);
-    }
-
-    public int from() {
-      return from;
-    }
-
-    public int to() {
-      return to;
-    }
-  }
-
-  private static class MyOptions {
-    // hard coded constants
-    int dictSize = 1000;
-    int minWordLen = 5;
-    int maxWordLen = 20;
-    int osInputBufferSize = 64 * 1024;
-    int osOutputBufferSize = 64 * 1024;
-    int fsInputBufferSizeNone = 0;
-    int fsInputBufferSizeLzo = 0;
-    int fsInputBufferSizeGz = 0;
-    int fsOutputBufferSizeNone = 1;
-    int fsOutputBufferSizeLzo = 1;
-    int fsOutputBufferSizeGz = 1;
-   
-    String rootDir =
-        System.getProperty("test.build.data", "/tmp/tfile-test");
-    String file = "TestTFileSeek";
-    String compress = "gz";
-    int minKeyLen = 10;
-    int maxKeyLen = 50;
-    int minValLength = 100;
-    int maxValLength = 200;
-    int minBlockSize = 64 * 1024;
-    int fsOutputBufferSize = 1;
-    int fsInputBufferSize = 0;
-    long fileSize = 3 * 1024 * 1024;
-    long seekCount = 1000;
-    long seed;
-
-    static final int OP_CREATE = 1;
-    static final int OP_READ = 2;
-    int op = OP_CREATE | OP_READ;
-
-    boolean proceed = false;
-
-    public MyOptions(String[] args) {
-      seed = System.nanoTime();
-
-      try {
-        Options opts = buildOptions();
-        CommandLineParser parser = new GnuParser();
-        CommandLine line = parser.parse(opts, args, true);
-        processOptions(line, opts);
-        validateOptions();
-      }
-      catch (ParseException e) {
-        System.out.println(e.getMessage());
-        System.out.println("Try \"--help\" option for details.");
-        setStopProceed();
-      }
-    }
-
-    public boolean proceed() {
-      return proceed;
-    }
-
-    private Options buildOptions() {
-      Option compress =
-          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
-              .hasArg().withDescription("compression scheme").create('c');
-
-      Option fileSize =
-          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
-              .hasArg().withDescription("target size of the file (in MB).")
-              .create('s');
-
-      Option fsInputBufferSz =
-          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
-              .hasArg().withDescription(
-                  "size of the file system input buffer (in bytes).").create(
-                  'i');
-
-      Option fsOutputBufferSize =
-          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
-              .hasArg().withDescription(
-                  "size of the file system output buffer (in bytes).").create(
-                  'o');
-
-      Option keyLen =
-          OptionBuilder
-              .withLongOpt("key-length")
-              .withArgName("min,max")
-              .hasArg()
-              .withDescription(
-                  "the length range of the key (in bytes)")
-              .create('k');
-
-      Option valueLen =
-          OptionBuilder
-              .withLongOpt("value-length")
-              .withArgName("min,max")
-              .hasArg()
-              .withDescription(
-                  "the length range of the value (in bytes)")
-              .create('v');
-
-      Option blockSz =
-          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
-              .withDescription("minimum block size (in KB)").create('b');
-
-      Option seed =
-          OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
-              .withDescription("specify the seed").create('S');
-
-      Option operation =
-          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
-              .withDescription(
-                  "action: seek-only, create-only, seek-after-create").create(
-                  'x');
-
-      Option rootDir =
-          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
-              .withDescription(
-                  "specify root directory where files will be created.")
-              .create('r');
-
-      Option file =
-          OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
-              .withDescription("specify the file name to be created or read.")
-              .create('f');
-
-      Option seekCount =
-          OptionBuilder
-              .withLongOpt("seek")
-              .withArgName("count")
-              .hasArg()
-              .withDescription(
-                  "specify how many seek operations we perform (requires -x r or -x rw.")
-              .create('n');
-
-      Option help =
-          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
-              "show this screen").create("h");
-
-      return new Options().addOption(compress).addOption(fileSize).addOption(
-          fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
-          .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption(
-              operation).addOption(seekCount).addOption(file).addOption(help);
-
-    }
-
-    private void processOptions(CommandLine line, Options opts)
-        throws ParseException {
-      // --help -h and --version -V must be processed first.
-      if (line.hasOption('h')) {
-        HelpFormatter formatter = new HelpFormatter();
-        System.out.println("TFile and SeqFile benchmark.");
-        System.out.println();
-        formatter.printHelp(100,
-            "java ... TestTFileSeqFileComparison [options]",
-            "\nSupported options:", opts, "");
-        return;
-      }
-
-      if (line.hasOption('c')) {
-        compress = line.getOptionValue('c');
-      }
-
-      if (line.hasOption('d')) {
-        dictSize = Integer.parseInt(line.getOptionValue('d'));
-      }
-
-      if (line.hasOption('s')) {
-        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
-      }
-
-      if (line.hasOption('i')) {
-        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
-      }
-
-      if (line.hasOption('o')) {
-        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
-      }
-      
-      if (line.hasOption('n')) {
-        seekCount = Integer.parseInt(line.getOptionValue('n'));
-      }
-
-      if (line.hasOption('k')) {
-        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
-        minKeyLen = ir.from();
-        maxKeyLen = ir.to();
-      }
-
-      if (line.hasOption('v')) {
-        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
-        minValLength = ir.from();
-        maxValLength = ir.to();
-      }
-
-      if (line.hasOption('b')) {
-        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
-      }
-
-      if (line.hasOption('r')) {
-        rootDir = line.getOptionValue('r');
-      }
-      
-      if (line.hasOption('f')) {
-        file = line.getOptionValue('f');
-      }
-
-      if (line.hasOption('S')) {
-        seed = Long.parseLong(line.getOptionValue('S'));
-      }
-
-      if (line.hasOption('x')) {
-        String strOp = line.getOptionValue('x');
-        if (strOp.equals("r")) {
-          op = OP_READ;
-        }
-        else if (strOp.equals("w")) {
-          op = OP_CREATE;
-        }
-        else if (strOp.equals("rw")) {
-          op = OP_CREATE | OP_READ;
-        }
-        else {
-          throw new ParseException("Unknown action specifier: " + strOp);
-        }
-      }
-
-      proceed = true;
-    }
-
-    private void validateOptions() throws ParseException {
-      if (!compress.equals("none") && !compress.equals("lzo")
-          && !compress.equals("gz")) {
-        throw new ParseException("Unknown compression scheme: " + compress);
-      }
-
-      if (minKeyLen >= maxKeyLen) {
-        throw new ParseException(
-            "Max key length must be greater than min key length.");
-      }
-
-      if (minValLength >= maxValLength) {
-        throw new ParseException(
-            "Max value length must be greater than min value length.");
-      }
-
-      if (minWordLen >= maxWordLen) {
-        throw new ParseException(
-            "Max word length must be greater than min word length.");
-      }
-      return;
-    }
-
-    private void setStopProceed() {
-      proceed = false;
-    }
-
-    public boolean doCreate() {
-      return (op & OP_CREATE) != 0;
-    }
-
-    public boolean doRead() {
-      return (op & OP_READ) != 0;
-    }
-  }
-  
-  public static void main(String[] argv) throws IOException {
-    TestTFileSeek testCase = new TestTFileSeek();
-    MyOptions options = new MyOptions(argv);
-    
-    if (options.proceed == false) {
-      return;
-    }
-
-    testCase.options = options;
-    testCase.setUp();
-    testCase.testSeeks();
-    testCase.tearDown();
-  }
-}