You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:04 UTC

[5/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
deleted file mode 100644
index e513ccd..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.util.Time;
-
-public class TestTFileSeqFileComparison extends TestCase {
-  MyOptions options;
-
-  private FileSystem fs;
-  private Configuration conf;
-  private long startTimeEpoch;
-  private long finishTimeEpoch;
-  private DateFormat formatter;
-  byte[][] dictionary;
-
-  @Override
-  public void setUp() throws IOException {
-    if (options == null) {
-      options = new MyOptions(new String[0]);
-    }
-
-    conf = new Configuration();
-    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
-    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
-    Path path = new Path(options.rootDir);
-    fs = path.getFileSystem(conf);
-    formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    setUpDictionary();
-  }
-
-  private void setUpDictionary() {
-    Random rng = new Random();
-    dictionary = new byte[options.dictSize][];
-    for (int i = 0; i < options.dictSize; ++i) {
-      int len =
-          rng.nextInt(options.maxWordLen - options.minWordLen)
-              + options.minWordLen;
-      dictionary[i] = new byte[len];
-      rng.nextBytes(dictionary[i]);
-    }
-  }
-
-  @Override
-  public void tearDown() throws IOException {
-    // do nothing
-  }
-
-  public void startTime() throws IOException {
-    startTimeEpoch = Time.now();
-    System.out.println(formatTime() + " Started timing.");
-  }
-
-  public void stopTime() throws IOException {
-    finishTimeEpoch = Time.now();
-    System.out.println(formatTime() + " Stopped timing.");
-  }
-
-  public long getIntervalMillis() throws IOException {
-    return finishTimeEpoch - startTimeEpoch;
-  }
-
-  public void printlnWithTimestamp(String message) throws IOException {
-    System.out.println(formatTime() + "  " + message);
-  }
-
-  /*
-   * Format millis into minutes and seconds.
-   */
-  public String formatTime(long milis) {
-    return formatter.format(milis);
-  }
-
-  public String formatTime() {
-    return formatTime(Time.now());
-  }
-
-  private interface KVAppendable {
-    public void append(BytesWritable key, BytesWritable value)
-        throws IOException;
-
-    public void close() throws IOException;
-  }
-
-  private interface KVReadable {
-    public byte[] getKey();
-
-    public byte[] getValue();
-
-    public int getKeyLength();
-
-    public int getValueLength();
-
-    public boolean next() throws IOException;
-
-    public void close() throws IOException;
-  }
-
-  static class TFileAppendable implements KVAppendable {
-    private FSDataOutputStream fsdos;
-    private TFile.Writer writer;
-
-    public TFileAppendable(FileSystem fs, Path path, String compress,
-        int minBlkSize, int osBufferSize, Configuration conf)
-        throws IOException {
-      this.fsdos = fs.create(path, true, osBufferSize);
-      this.writer = new TFile.Writer(fsdos, minBlkSize, compress, null, conf);
-    }
-
-    @Override
-    public void append(BytesWritable key, BytesWritable value)
-        throws IOException {
-      writer.append(key.get(), 0, key.getSize(), value.get(), 0, value
-          .getSize());
-    }
-
-    @Override
-    public void close() throws IOException {
-      writer.close();
-      fsdos.close();
-    }
-  }
-
-  static class TFileReadable implements KVReadable {
-    private FSDataInputStream fsdis;
-    private DTFile.Reader reader;
-    private DTFile.Reader.Scanner scanner;
-    private byte[] keyBuffer;
-    private int keyLength;
-    private byte[] valueBuffer;
-    private int valueLength;
-
-    public TFileReadable(FileSystem fs, Path path, int osBufferSize,
-        Configuration conf) throws IOException {
-      this.fsdis = fs.open(path, osBufferSize);
-      this.reader =
-          new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
-      this.scanner = reader.createScanner();
-      keyBuffer = new byte[32];
-      valueBuffer = new byte[32];
-    }
-
-    private void checkKeyBuffer(int size) {
-      if (size <= keyBuffer.length) {
-        return;
-      }
-      keyBuffer =
-          new byte[Math.max(2 * keyBuffer.length, 2 * size - keyBuffer.length)];
-    }
-
-    private void checkValueBuffer(int size) {
-      if (size <= valueBuffer.length) {
-        return;
-      }
-      valueBuffer =
-          new byte[Math.max(2 * valueBuffer.length, 2 * size
-              - valueBuffer.length)];
-    }
-
-    @Override
-    public byte[] getKey() {
-      return keyBuffer;
-    }
-
-    @Override
-    public int getKeyLength() {
-      return keyLength;
-    }
-
-    @Override
-    public byte[] getValue() {
-      return valueBuffer;
-    }
-
-    @Override
-    public int getValueLength() {
-      return valueLength;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      if (scanner.atEnd()) return false;
-      Entry entry = scanner.entry();
-      keyLength = entry.getKeyLength();
-      checkKeyBuffer(keyLength);
-      entry.getKey(keyBuffer);
-      valueLength = entry.getValueLength();
-      checkValueBuffer(valueLength);
-      entry.getValue(valueBuffer);
-      scanner.advance();
-      return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-      scanner.close();
-      reader.close();
-      fsdis.close();
-    }
-  }
-
-  static class SeqFileAppendable implements KVAppendable {
-    private FSDataOutputStream fsdos;
-    private SequenceFile.Writer writer;
-
-    public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
-        String compress, int minBlkSize) throws IOException {
-      Configuration conf = new Configuration();
-      conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
-                      true);
-
-      CompressionCodec codec = null;
-      if ("lzo".equals(compress)) {
-        codec = Compression.Algorithm.LZO.getCodec();
-      }
-      else if ("gz".equals(compress)) {
-        codec = Compression.Algorithm.GZ.getCodec();
-      }
-      else if (!"none".equals(compress))
-        throw new IOException("Codec not supported.");
-
-      this.fsdos = fs.create(path, true, osBufferSize);
-
-      if (!"none".equals(compress)) {
-        writer =
-            SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
-                BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
-      }
-      else {
-        writer =
-            SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
-                BytesWritable.class, SequenceFile.CompressionType.NONE, null);
-      }
-    }
-
-    @Override
-    public void append(BytesWritable key, BytesWritable value)
-        throws IOException {
-      writer.append(key, value);
-    }
-
-    @Override
-    public void close() throws IOException {
-      writer.close();
-      fsdos.close();
-    }
-  }
-
-  static class SeqFileReadable implements KVReadable {
-    private SequenceFile.Reader reader;
-    private BytesWritable key;
-    private BytesWritable value;
-
-    public SeqFileReadable(FileSystem fs, Path path, int osBufferSize)
-        throws IOException {
-      Configuration conf = new Configuration();
-      conf.setInt("io.file.buffer.size", osBufferSize);
-      reader = new SequenceFile.Reader(fs, path, conf);
-      key = new BytesWritable();
-      value = new BytesWritable();
-    }
-
-    @Override
-    public byte[] getKey() {
-      return key.get();
-    }
-
-    @Override
-    public int getKeyLength() {
-      return key.getSize();
-    }
-
-    @Override
-    public byte[] getValue() {
-      return value.get();
-    }
-
-    @Override
-    public int getValueLength() {
-      return value.getSize();
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      return reader.next(key, value);
-    }
-
-    @Override
-    public void close() throws IOException {
-      reader.close();
-    }
-  }
-
-  private void reportStats(Path path, long totalBytes) throws IOException {
-    long duration = getIntervalMillis();
-    long fsize = fs.getFileStatus(path).getLen();
-    printlnWithTimestamp(String.format(
-        "Duration: %dms...total size: %.2fMB...raw thrpt: %.2fMB/s", duration,
-        (double) totalBytes / 1024 / 1024, (double) totalBytes / duration
-            * 1000 / 1024 / 1024));
-    printlnWithTimestamp(String.format(
-        "Compressed size: %.2fMB...compressed thrpt: %.2fMB/s.",
-        (double) fsize / 1024 / 1024, (double) fsize / duration * 1000 / 1024
-            / 1024));
-  }
-
-  private void fillBuffer(Random rng, BytesWritable bw, byte[] tmp, int len) {
-    int n = 0;
-    while (n < len) {
-      byte[] word = dictionary[rng.nextInt(dictionary.length)];
-      int l = Math.min(word.length, len - n);
-      System.arraycopy(word, 0, tmp, n, l);
-      n += l;
-    }
-    bw.set(tmp, 0, len);
-  }
-
-  private void timeWrite(Path path, KVAppendable appendable, int baseKlen,
-      int baseVlen, long fileSize) throws IOException {
-    int maxKlen = baseKlen * 2;
-    int maxVlen = baseVlen * 2;
-    BytesWritable key = new BytesWritable();
-    BytesWritable value = new BytesWritable();
-    byte[] keyBuffer = new byte[maxKlen];
-    byte[] valueBuffer = new byte[maxVlen];
-    Random rng = new Random(options.seed);
-    long totalBytes = 0;
-    printlnWithTimestamp("Start writing: " + path.getName() + "...");
-    startTime();
-
-    for (long i = 0; true; ++i) {
-      if (i % 1000 == 0) { // test the size for every 1000 rows.
-        if (fs.getFileStatus(path).getLen() >= fileSize) {
-          break;
-        }
-      }
-      int klen = rng.nextInt(baseKlen) + baseKlen;
-      int vlen = rng.nextInt(baseVlen) + baseVlen;
-      fillBuffer(rng, key, keyBuffer, klen);
-      fillBuffer(rng, value, valueBuffer, vlen);
-      key.set(keyBuffer, 0, klen);
-      value.set(valueBuffer, 0, vlen);
-      appendable.append(key, value);
-      totalBytes += klen;
-      totalBytes += vlen;
-    }
-    stopTime();
-    appendable.close();
-    reportStats(path, totalBytes);
-  }
-
-  private void timeRead(Path path, KVReadable readable) throws IOException {
-    printlnWithTimestamp("Start reading: " + path.getName() + "...");
-    long totalBytes = 0;
-    startTime();
-    for (; readable.next();) {
-      totalBytes += readable.getKeyLength();
-      totalBytes += readable.getValueLength();
-    }
-    stopTime();
-    readable.close();
-    reportStats(path, totalBytes);
-  }
-
-  private void createTFile(String parameters, String compress)
-      throws IOException {
-    System.out.println("=== TFile: Creation (" + parameters + ") === ");
-    Path path = new Path(options.rootDir, "TFile.Performance");
-    KVAppendable appendable =
-        new TFileAppendable(fs, path, compress, options.minBlockSize,
-            options.osOutputBufferSize, conf);
-    timeWrite(path, appendable, options.keyLength, options.valueLength,
-        options.fileSize);
-  }
-
-  private void readTFile(String parameters, boolean delFile) throws IOException {
-    System.out.println("=== TFile: Reading (" + parameters + ") === ");
-    {
-      Path path = new Path(options.rootDir, "TFile.Performance");
-      KVReadable readable =
-          new TFileReadable(fs, path, options.osInputBufferSize, conf);
-      timeRead(path, readable);
-      if (delFile) {
-        if (fs.exists(path)) {
-          fs.delete(path, true);
-        }
-      }
-    }
-  }
-
-  private void createSeqFile(String parameters, String compress)
-      throws IOException {
-    System.out.println("=== SeqFile: Creation (" + parameters + ") === ");
-    Path path = new Path(options.rootDir, "SeqFile.Performance");
-    KVAppendable appendable =
-        new SeqFileAppendable(fs, path, options.osOutputBufferSize, compress,
-            options.minBlockSize);
-    timeWrite(path, appendable, options.keyLength, options.valueLength,
-        options.fileSize);
-  }
-
-  private void readSeqFile(String parameters, boolean delFile)
-      throws IOException {
-    System.out.println("=== SeqFile: Reading (" + parameters + ") === ");
-    Path path = new Path(options.rootDir, "SeqFile.Performance");
-    KVReadable readable =
-        new SeqFileReadable(fs, path, options.osInputBufferSize);
-    timeRead(path, readable);
-    if (delFile) {
-      if (fs.exists(path)) {
-        fs.delete(path, true);
-      }
-    }
-  }
-
-  private void compareRun(String compress) throws IOException {
-    String[] supported = TFile.getSupportedCompressionAlgorithms();
-    boolean proceed = false;
-    for (String c : supported) {
-      if (c.equals(compress)) {
-        proceed = true;
-        break;
-      }
-    }
-
-    if (!proceed) {
-      System.out.println("Skipped for " + compress);
-      return;
-    }
-    
-    options.compress = compress;
-    String parameters = parameters2String(options);
-    createSeqFile(parameters, compress);
-    readSeqFile(parameters, true);
-    createTFile(parameters, compress);
-    readTFile(parameters, true);
-    createTFile(parameters, compress);
-    readTFile(parameters, true);
-    createSeqFile(parameters, compress);
-    readSeqFile(parameters, true);
-  }
-
-  public void testRunComparisons() throws IOException {
-    String[] compresses = new String[] { "none", "lzo", "gz" };
-    for (String compress : compresses) {
-      if (compress.equals("none")) {
-        conf
-            .setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeNone);
-        conf.setInt("tfile.fs.output.buffer.size",
-            options.fsOutputBufferSizeNone);
-      }
-      else if (compress.equals("lzo")) {
-        conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeLzo);
-        conf.setInt("tfile.fs.output.buffer.size",
-            options.fsOutputBufferSizeLzo);
-      }
-      else {
-        conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeGz);
-        conf
-            .setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSizeGz);
-      }
-      compareRun(compress);
-    }
-  }
-
-  private static String parameters2String(MyOptions options) {
-    return String
-        .format(
-            "KLEN: %d-%d... VLEN: %d-%d...MinBlkSize: %.2fKB...Target Size: %.2fMB...Compression: ...%s",
-            options.keyLength, options.keyLength * 2, options.valueLength,
-            options.valueLength * 2, (double) options.minBlockSize / 1024,
-            (double) options.fileSize / 1024 / 1024, options.compress);
-  }
-
-  private static class MyOptions {
-    String rootDir =
-        System
-            .getProperty("test.build.data", "/tmp/tfile-test");
-    String compress = "gz";
-    String format = "tfile";
-    int dictSize = 1000;
-    int minWordLen = 5;
-    int maxWordLen = 20;
-    int keyLength = 50;
-    int valueLength = 100;
-    int minBlockSize = 256 * 1024;
-    int fsOutputBufferSize = 1;
-    int fsInputBufferSize = 0;
-    // special variable only for unit testing.
-    int fsInputBufferSizeNone = 0;
-    int fsInputBufferSizeGz = 0;
-    int fsInputBufferSizeLzo = 0;
-    int fsOutputBufferSizeNone = 1;
-    int fsOutputBufferSizeGz = 1;
-    int fsOutputBufferSizeLzo = 1;
-
-    // un-exposed parameters.
-    int osInputBufferSize = 64 * 1024;
-    int osOutputBufferSize = 64 * 1024;
-
-    long fileSize = 3 * 1024 * 1024;
-    long seed;
-
-    static final int OP_CREATE = 1;
-    static final int OP_READ = 2;
-    int op = OP_READ;
-
-    boolean proceed = false;
-
-    public MyOptions(String[] args) {
-      seed = System.nanoTime();
-
-      try {
-        Options opts = buildOptions();
-        CommandLineParser parser = new GnuParser();
-        CommandLine line = parser.parse(opts, args, true);
-        processOptions(line, opts);
-        validateOptions();
-      }
-      catch (ParseException e) {
-        System.out.println(e.getMessage());
-        System.out.println("Try \"--help\" option for details.");
-        setStopProceed();
-      }
-    }
-
-    public boolean proceed() {
-      return proceed;
-    }
-
-    private Options buildOptions() {
-      Option compress =
-          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
-              .hasArg().withDescription("compression scheme").create('c');
-
-      Option ditSize =
-          OptionBuilder.withLongOpt("dict").withArgName("size").hasArg()
-              .withDescription("number of dictionary entries").create('d');
-
-      Option fileSize =
-          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
-              .hasArg().withDescription("target size of the file (in MB).")
-              .create('s');
-
-      Option format =
-          OptionBuilder.withLongOpt("format").withArgName("[tfile|seqfile]")
-              .hasArg().withDescription("choose TFile or SeqFile").create('f');
-
-      Option fsInputBufferSz =
-          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
-              .hasArg().withDescription(
-                  "size of the file system input buffer (in bytes).").create(
-                  'i');
-
-      Option fsOutputBufferSize =
-          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
-              .hasArg().withDescription(
-                  "size of the file system output buffer (in bytes).").create(
-                  'o');
-
-      Option keyLen =
-          OptionBuilder
-              .withLongOpt("key-length")
-              .withArgName("length")
-              .hasArg()
-              .withDescription(
-                  "base length of the key (in bytes), actual length varies in [base, 2*base)")
-              .create('k');
-
-      Option valueLen =
-          OptionBuilder
-              .withLongOpt("value-length")
-              .withArgName("length")
-              .hasArg()
-              .withDescription(
-                  "base length of the value (in bytes), actual length varies in [base, 2*base)")
-              .create('v');
-
-      Option wordLen =
-          OptionBuilder.withLongOpt("word-length").withArgName("min,max")
-              .hasArg().withDescription(
-                  "range of dictionary word length (in bytes)").create('w');
-
-      Option blockSz =
-          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
-              .withDescription("minimum block size (in KB)").create('b');
-
-      Option seed =
-          OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
-              .withDescription("specify the seed").create('S');
-
-      Option operation =
-          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
-              .withDescription(
-                  "action: read-only, create-only, read-after-create").create(
-                  'x');
-
-      Option rootDir =
-          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
-              .withDescription(
-                  "specify root directory where files will be created.")
-              .create('r');
-
-      Option help =
-          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
-              "show this screen").create("h");
-
-      return new Options().addOption(compress).addOption(ditSize).addOption(
-          fileSize).addOption(format).addOption(fsInputBufferSz).addOption(
-          fsOutputBufferSize).addOption(keyLen).addOption(wordLen).addOption(
-          blockSz).addOption(rootDir).addOption(valueLen).addOption(operation)
-          .addOption(help);
-
-    }
-
-    private void processOptions(CommandLine line, Options opts)
-        throws ParseException {
-      // --help -h and --version -V must be processed first.
-      if (line.hasOption('h')) {
-        HelpFormatter formatter = new HelpFormatter();
-        System.out.println("TFile and SeqFile benchmark.");
-        System.out.println();
-        formatter.printHelp(100,
-            "java ... TestTFileSeqFileComparison [options]",
-            "\nSupported options:", opts, "");
-        return;
-      }
-
-      if (line.hasOption('c')) {
-        compress = line.getOptionValue('c');
-      }
-
-      if (line.hasOption('d')) {
-        dictSize = Integer.parseInt(line.getOptionValue('d'));
-      }
-
-      if (line.hasOption('s')) {
-        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
-      }
-
-      if (line.hasOption('f')) {
-        format = line.getOptionValue('f');
-      }
-
-      if (line.hasOption('i')) {
-        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
-      }
-
-      if (line.hasOption('o')) {
-        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
-      }
-
-      if (line.hasOption('k')) {
-        keyLength = Integer.parseInt(line.getOptionValue('k'));
-      }
-
-      if (line.hasOption('v')) {
-        valueLength = Integer.parseInt(line.getOptionValue('v'));
-      }
-
-      if (line.hasOption('b')) {
-        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
-      }
-
-      if (line.hasOption('r')) {
-        rootDir = line.getOptionValue('r');
-      }
-
-      if (line.hasOption('S')) {
-        seed = Long.parseLong(line.getOptionValue('S'));
-      }
-
-      if (line.hasOption('w')) {
-        String min_max = line.getOptionValue('w');
-        StringTokenizer st = new StringTokenizer(min_max, " \t,");
-        if (st.countTokens() != 2) {
-          throw new ParseException("Bad word length specification: " + min_max);
-        }
-        minWordLen = Integer.parseInt(st.nextToken());
-        maxWordLen = Integer.parseInt(st.nextToken());
-      }
-
-      if (line.hasOption('x')) {
-        String strOp = line.getOptionValue('x');
-        if (strOp.equals("r")) {
-          op = OP_READ;
-        }
-        else if (strOp.equals("w")) {
-          op = OP_CREATE;
-        }
-        else if (strOp.equals("rw")) {
-          op = OP_CREATE | OP_READ;
-        }
-        else {
-          throw new ParseException("Unknown action specifier: " + strOp);
-        }
-      }
-
-      proceed = true;
-    }
-
-    private void validateOptions() throws ParseException {
-      if (!compress.equals("none") && !compress.equals("lzo")
-          && !compress.equals("gz")) {
-        throw new ParseException("Unknown compression scheme: " + compress);
-      }
-
-      if (!format.equals("tfile") && !format.equals("seqfile")) {
-        throw new ParseException("Unknown file format: " + format);
-      }
-
-      if (minWordLen >= maxWordLen) {
-        throw new ParseException(
-            "Max word length must be greater than min word length.");
-      }
-      return;
-    }
-
-    private void setStopProceed() {
-      proceed = false;
-    }
-
-    public boolean doCreate() {
-      return (op & OP_CREATE) != 0;
-    }
-
-    public boolean doRead() {
-      return (op & OP_READ) != 0;
-    }
-  }
-
-  public static void main(String[] args) throws IOException {
-    TestTFileSeqFileComparison testCase = new TestTFileSeqFileComparison();
-    MyOptions options = new MyOptions(args);
-    if (options.proceed == false) {
-      return;
-    }
-    testCase.options = options;
-    String parameters = parameters2String(options);
-
-    testCase.setUp();
-    if (testCase.options.format.equals("tfile")) {
-      if (options.doCreate()) {
-        testCase.createTFile(parameters, options.compress);
-      }
-      if (options.doRead()) {
-        testCase.readTFile(parameters, options.doCreate());
-      }
-    }
-    else {
-      if (options.doCreate()) {
-        testCase.createSeqFile(parameters, options.compress);
-      }
-      if (options.doRead()) {
-        testCase.readSeqFile(parameters, options.doCreate());
-      }
-    }
-    testCase.tearDown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
deleted file mode 100644
index aad563d..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-public class TestTFileSplit extends TestCase {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-
-  private final static int BLOCK_SIZE = 64 * 1024;
-
-  private static final String KEY = "key";
-  private static final String VALUE = "value";
-
-  private FileSystem fs;
-  private Configuration conf;
-  private Path path;
-  private Random random = new Random();
-
-  private String comparator = "memcmp";
-  private String outputFile = "TestTFileSplit";
-
-  void createFile(int count, String compress) throws IOException {
-    conf = new Configuration();
-    path = new Path(ROOT, outputFile + "." + compress);
-    fs = path.getFileSystem(conf);
-    FSDataOutputStream out = fs.create(path);
-    Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf);
-
-    int nx;
-    for (nx = 0; nx < count; nx++) {
-      byte[] key = composeSortedKey(KEY, count, nx).getBytes();
-      byte[] value = (VALUE + nx).getBytes();
-      writer.append(key, value);
-    }
-    writer.close();
-    out.close();
-  }
-
-  void readFile() throws IOException {
-    long fileLength = fs.getFileStatus(path).getLen();
-    int numSplit = 10;
-    long splitSize = fileLength / numSplit + 1;
-
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    long offset = 0;
-    long rowCount = 0;
-    BytesWritable key, value;
-    for (int i = 0; i < numSplit; ++i, offset += splitSize) {
-      Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
-      int count = 0;
-      key = new BytesWritable();
-      value = new BytesWritable();
-      while (!scanner.atEnd()) {
-        scanner.entry().get(key, value);
-        ++count;
-        scanner.advance();
-      }
-      scanner.close();
-      Assert.assertTrue(count > 0);
-      rowCount += count;
-    }
-    Assert.assertEquals(rowCount, reader.getEntryCount());
-    reader.close();
-  }
-
-  /* Similar to readFile(), tests the scanner created 
-   * by record numbers rather than the offsets.
-   */
-  void readRowSplits(int numSplits) throws IOException {
-
-    Reader reader =
-      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    
-    long totalRecords = reader.getEntryCount();
-    for (int i=0; i<numSplits; i++) {
-      long startRec = i*totalRecords/numSplits;
-      long endRec = (i+1)*totalRecords/numSplits;
-      if (i == numSplits-1) {
-        endRec = totalRecords;
-      }
-      Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
-      int count = 0;
-      BytesWritable key = new BytesWritable();
-      BytesWritable value = new BytesWritable();
-      long x=startRec;
-      while (!scanner.atEnd()) {
-        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
-        scanner.entry().get(key, value);
-        ++count;
-        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
-        scanner.advance();
-        ++x;
-      }
-      scanner.close();
-      Assert.assertTrue(count == (endRec - startRec));
-    }
-    // make sure specifying range at the end gives zero records.
-    Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
-    Assert.assertTrue(scanner.atEnd());
-  }
-  
-  static String composeSortedKey(String prefix, int total, int value) {
-    return String.format("%s%010d", prefix, value);
-  }
-  
-  void checkRecNums() throws IOException {
-    long fileLen = fs.getFileStatus(path).getLen();
-    Reader reader = new Reader(fs.open(path), fileLen, conf);
-    long totalRecs = reader.getEntryCount();
-    long begin = random.nextLong() % (totalRecs / 2);
-    if (begin < 0)
-      begin += (totalRecs / 2);
-    long end = random.nextLong() % (totalRecs / 2);
-    if (end < 0)
-      end += (totalRecs / 2);
-    end += (totalRecs / 2) + 1;
-
-    assertEquals("RecNum for offset=0 should be 0", 0, reader
-        .getRecordNumNear(0));
-    for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
-      assertEquals("RecNum for offset>=fileLen should be total entries",
-          totalRecs, reader.getRecordNumNear(x));
-    }
-
-    for (long i = 0; i < 100; ++i) {
-      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
-          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
-    }
-
-    for (long i = 1; i < 100; ++i) {
-      long x = totalRecs - i;
-      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
-          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
-    }
-
-    for (long i = begin; i < end; ++i) {
-      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
-          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
-    }
-
-    for (int i = 0; i < 1000; ++i) {
-      long x = random.nextLong() % totalRecs;
-      if (x < 0) x += totalRecs;
-      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
-          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
-    }
-  }
-  
-  public void testSplit() throws IOException {
-    System.out.println("testSplit");
-    createFile(100000, Compression.Algorithm.NONE.getName());
-    checkRecNums();   
-    readFile();
-    readRowSplits(10);
-    fs.delete(path, true);
-    createFile(500000, Compression.Algorithm.GZ.getName());
-    checkRecNums();
-    readFile();
-    readRowSplits(83);
-    fs.delete(path, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
deleted file mode 100644
index 2e0506c..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-/**
- * 
- * Streaming interfaces test case class using GZ compression codec, base class
- * of none and LZO compression classes.
- * 
- */
-
-public class TestTFileStreams extends TestCase {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-
-  private final static int BLOCK_SIZE = 512;
-  private final static int K = 1024;
-  private final static int M = K * K;
-  protected boolean skip = false;
-  private FileSystem fs;
-  private Configuration conf;
-  private Path path;
-  private FSDataOutputStream out;
-  Writer writer;
-
-  private String compression = Compression.Algorithm.GZ.getName();
-  private String comparator = "memcmp";
-  private final String outputFile = getClass().getSimpleName();
-
-  public void init(String compression, String comparator) {
-    this.compression = compression;
-    this.comparator = comparator;
-  }
-
-  @Override
-  public void setUp() throws IOException {
-    conf = new Configuration();
-    path = new Path(ROOT, outputFile);
-    fs = path.getFileSystem(conf);
-    out = fs.create(path);
-    writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
-  }
-
-  @Override
-  public void tearDown() throws IOException {
-    if (!skip) {
-      try {
-        closeOutput();
-      } catch (Exception e) {
-        // no-op
-      }
-      fs.delete(path, true);
-    }
-  }
-
-  public void testNoEntry() throws IOException {
-    if (skip)
-      return;
-    closeOutput();
-    TestDTFileByteArrays.readRecords(fs, path, 0, conf);
-  }
-
-  public void testOneEntryKnownLength() throws IOException {
-    if (skip)
-      return;
-    writeRecords(1, true, true);
-
-    TestDTFileByteArrays.readRecords(fs, path, 1, conf);
-  }
-
-  public void testOneEntryUnknownLength() throws IOException {
-    if (skip)
-      return;
-    writeRecords(1, false, false);
-
-    // TODO: will throw exception at getValueLength, it's inconsistent though;
-    // getKeyLength returns a value correctly, though initial length is -1
-    TestDTFileByteArrays.readRecords(fs, path, 1, conf);
-  }
-
-  // known key length, unknown value length
-  public void testOneEntryMixedLengths1() throws IOException {
-    if (skip)
-      return;
-    writeRecords(1, true, false);
-
-    TestDTFileByteArrays.readRecords(fs, path, 1, conf);
-  }
-
-  // unknown key length, known value length
-  public void testOneEntryMixedLengths2() throws IOException {
-    if (skip)
-      return;
-    writeRecords(1, false, true);
-
-    TestDTFileByteArrays.readRecords(fs, path, 1, conf);
-  }
-
-  public void testTwoEntriesKnownLength() throws IOException {
-    if (skip)
-      return;
-    writeRecords(2, true, true);
-
-    TestDTFileByteArrays.readRecords(fs, path, 2, conf);
-  }
-
-  // Negative test
-  public void testFailureAddKeyWithoutValue() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream dos = writer.prepareAppendKey(-1);
-    dos.write("key0".getBytes());
-    try {
-      closeOutput();
-      fail("Cannot add only a key without a value. ");
-    }
-    catch (IllegalStateException e) {
-      // noop, expecting an exception
-    }
-  }
-
-  public void testFailureAddValueWithoutKey() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outValue = null;
-    try {
-      outValue = writer.prepareAppendValue(6);
-      outValue.write("value0".getBytes());
-      fail("Cannot add a value without adding key first. ");
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-    finally {
-      if (outValue != null) {
-        outValue.close();
-      }
-    }
-  }
-
-  public void testFailureOneEntryKnownLength() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(2);
-    try {
-      outKey.write("key0".getBytes());
-      fail("Specified key length mismatched the actual key length.");
-    }
-    catch (IOException e) {
-      // noop, expecting an exception
-    }
-
-    DataOutputStream outValue = null;
-    try {
-      outValue = writer.prepareAppendValue(6);
-      outValue.write("value0".getBytes());
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-  }
-
-  public void testFailureKeyTooLong() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(2);
-    try {
-      outKey.write("key0".getBytes());
-      outKey.close();
-      Assert.fail("Key is longer than requested.");
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-    finally {
-    }
-  }
-
-  public void testFailureKeyTooShort() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(4);
-    outKey.write("key0".getBytes());
-    outKey.close();
-    DataOutputStream outValue = writer.prepareAppendValue(15);
-    try {
-      outValue.write("value0".getBytes());
-      outValue.close();
-      Assert.fail("Value is shorter than expected.");
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-    finally {
-    }
-  }
-
-  public void testFailureValueTooLong() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(4);
-    outKey.write("key0".getBytes());
-    outKey.close();
-    DataOutputStream outValue = writer.prepareAppendValue(3);
-    try {
-      outValue.write("value0".getBytes());
-      outValue.close();
-      Assert.fail("Value is longer than expected.");
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-
-    try {
-      outKey.close();
-      outKey.close();
-    }
-    catch (Exception e) {
-      Assert.fail("Second or more close() should have no effect.");
-    }
-  }
-
-  public void testFailureValueTooShort() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(8);
-    try {
-      outKey.write("key0".getBytes());
-      outKey.close();
-      Assert.fail("Key is shorter than expected.");
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-    finally {
-    }
-  }
-
-  public void testFailureCloseKeyStreamManyTimesInWriter() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(4);
-    try {
-      outKey.write("key0".getBytes());
-      outKey.close();
-    }
-    catch (Exception e) {
-      // noop, expecting an exception
-    }
-    finally {
-      try {
-        outKey.close();
-      }
-      catch (Exception e) {
-        // no-op
-      }
-    }
-    outKey.close();
-    outKey.close();
-    Assert.assertTrue("Multiple close should have no effect.", true);
-  }
-
-  public void testFailureKeyLongerThan64K() throws IOException {
-    if (skip)
-      return;
-    try {
-      DataOutputStream outKey = writer.prepareAppendKey(64 * K + 1);
-      Assert.fail("Failed to handle key longer than 64K.");
-    }
-    catch (IndexOutOfBoundsException e) {
-      // noop, expecting exceptions
-    }
-    closeOutput();
-  }
-
-  public void testFailureKeyLongerThan64K_2() throws IOException {
-    if (skip)
-      return;
-    DataOutputStream outKey = writer.prepareAppendKey(-1);
-    try {
-      byte[] buf = new byte[K];
-      Random rand = new Random();
-      for (int nx = 0; nx < K + 2; nx++) {
-        rand.nextBytes(buf);
-        outKey.write(buf);
-      }
-      outKey.close();
-      Assert.fail("Failed to handle key longer than 64K.");
-    }
-    catch (EOFException e) {
-      // noop, expecting exceptions
-    }
-    finally {
-      try {
-        closeOutput();
-      }
-      catch (Exception e) {
-        // no-op
-      }
-    }
-  }
-
-  public void testFailureNegativeOffset() throws IOException {
-    if (skip)
-      return;
-    writeRecords(2, true, true);
-
-    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-    byte[] buf = new byte[K];
-    try {
-      scanner.entry().getKey(buf, -1);
-      Assert.fail("Failed to handle key negative offset.");
-    }
-    catch (Exception e) {
-      // noop, expecting exceptions
-    }
-    finally {
-    }
-    scanner.close();
-    reader.close();
-  }
-
-  /**
-   * Verify that the compressed data size is less than raw data size.
-   * 
-   * @throws IOException
-   */
-  public void testFailureCompressionNotWorking() throws IOException {
-    if (skip)
-      return;
-    long rawDataSize = writeRecords(10000, false, false, false);
-    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
-      Assert.assertTrue(out.getPos() < rawDataSize);
-    }
-    closeOutput();
-  }
-
-  public void testFailureCompressionNotWorking2() throws IOException {
-    if (skip)
-      return;
-    long rawDataSize = writeRecords(10000, true, true, false);
-    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
-      Assert.assertTrue(out.getPos() < rawDataSize);
-    }
-    closeOutput();
-  }
-
-  private long writeRecords(int count, boolean knownKeyLength,
-      boolean knownValueLength, boolean close) throws IOException {
-    long rawDataSize = 0;
-    for (int nx = 0; nx < count; nx++) {
-      String key = TestDTFileByteArrays.composeSortedKey("key", nx);
-      DataOutputStream outKey =
-          writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
-      outKey.write(key.getBytes());
-      outKey.close();
-      String value = "value" + nx;
-      DataOutputStream outValue =
-          writer.prepareAppendValue(knownValueLength ? value.length() : -1);
-      outValue.write(value.getBytes());
-      outValue.close();
-      rawDataSize +=
-          WritableUtils.getVIntSize(key.getBytes().length)
-              + key.getBytes().length
-              + WritableUtils.getVIntSize(value.getBytes().length)
-              + value.getBytes().length;
-    }
-    if (close) {
-      closeOutput();
-    }
-    return rawDataSize;
-  }
-
-  private long writeRecords(int count, boolean knownKeyLength,
-      boolean knownValueLength) throws IOException {
-    return writeRecords(count, knownKeyLength, knownValueLength, true);
-  }
-
-  private void closeOutput() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-    }
-    if (out != null) {
-      out.close();
-      out = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
deleted file mode 100644
index a58f649..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-public class TestTFileUnsortedByteArrays extends TestCase {
-  private static String ROOT =
-      System.getProperty("test.build.data", "/tmp/tfile-test");
-
-
-  private final static int BLOCK_SIZE = 512;
-  private final static int BUF_SIZE = 64;
-
-  private FileSystem fs;
-  private Configuration conf;
-  private Path path;
-  private FSDataOutputStream out;
-  private Writer writer;
-
-  private String compression = Compression.Algorithm.GZ.getName();
-  private String outputFile = "TFileTestUnsorted";
-  /*
-   * pre-sampled numbers of records in one block, based on the given the
-   * generated key and value strings
-   */
-  private int records1stBlock = 4314;
-  private int records2ndBlock = 4108;
-
-  public void init(String compression, String outputFile,
-      int numRecords1stBlock, int numRecords2ndBlock) {
-    this.compression = compression;
-    this.outputFile = outputFile;
-    this.records1stBlock = numRecords1stBlock;
-    this.records2ndBlock = numRecords2ndBlock;
-  }
-
-  @Override
-  public void setUp() throws IOException {
-    conf = new Configuration();
-    path = new Path(ROOT, outputFile);
-    fs = path.getFileSystem(conf);
-    out = fs.create(path);
-    writer = new Writer(out, BLOCK_SIZE, compression, null, conf);
-    writer.append("keyZ".getBytes(), "valueZ".getBytes());
-    writer.append("keyM".getBytes(), "valueM".getBytes());
-    writer.append("keyN".getBytes(), "valueN".getBytes());
-    writer.append("keyA".getBytes(), "valueA".getBytes());
-    closeOutput();
-  }
-
-  @Override
-  public void tearDown() throws IOException {
-    fs.delete(path, true);
-  }
-
-  // we still can scan records in an unsorted TFile
-  public void testFailureScannerWithKeys() throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Assert.assertFalse(reader.isSorted());
-    Assert.assertEquals((int) reader.getEntryCount(), 4);
-
-    try {
-      Scanner scanner =
-          reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
-      Assert
-          .fail("Failed to catch creating scanner with keys on unsorted file.");
-    }
-    catch (RuntimeException e) {
-    }
-    finally {
-      reader.close();
-    }
-  }
-
-  // we still can scan records in an unsorted TFile
-  public void testScan() throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Assert.assertFalse(reader.isSorted());
-    Assert.assertEquals((int) reader.getEntryCount(), 4);
-
-    Scanner scanner = reader.createScanner();
-
-    try {
-
-      // read key and value
-      byte[] kbuf = new byte[BUF_SIZE];
-      int klen = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf);
-      Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
-
-      byte[] vbuf = new byte[BUF_SIZE];
-      int vlen = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf);
-      Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
-
-      scanner.advance();
-
-      // now try get value first
-      vbuf = new byte[BUF_SIZE];
-      vlen = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf);
-      Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
-
-      kbuf = new byte[BUF_SIZE];
-      klen = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf);
-      Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
-    }
-    finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  // we still can scan records in an unsorted TFile
-  public void testScanRange() throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Assert.assertFalse(reader.isSorted());
-    Assert.assertEquals((int) reader.getEntryCount(), 4);
-
-    Scanner scanner = reader.createScanner();
-
-    try {
-
-      // read key and value
-      byte[] kbuf = new byte[BUF_SIZE];
-      int klen = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf);
-      Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
-
-      byte[] vbuf = new byte[BUF_SIZE];
-      int vlen = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf);
-      Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
-
-      scanner.advance();
-
-      // now try get value first
-      vbuf = new byte[BUF_SIZE];
-      vlen = scanner.entry().getValueLength();
-      scanner.entry().getValue(vbuf);
-      Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
-
-      kbuf = new byte[BUF_SIZE];
-      klen = scanner.entry().getKeyLength();
-      scanner.entry().getKey(kbuf);
-      Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
-    }
-    finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  public void testFailureSeek() throws IOException {
-    Reader reader =
-        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-    Scanner scanner = reader.createScanner();
-
-    try {
-      // can't find ceil
-      try {
-        scanner.lowerBound("keyN".getBytes());
-        Assert.fail("Cannot search in a unsorted TFile!");
-      }
-      catch (Exception e) {
-        // noop, expecting excetions
-      }
-      finally {
-      }
-
-      // can't find higher
-      try {
-        scanner.upperBound("keyA".getBytes());
-        Assert.fail("Cannot search higher in a unsorted TFile!");
-      }
-      catch (Exception e) {
-        // noop, expecting excetions
-      }
-      finally {
-      }
-
-      // can't seek
-      try {
-        scanner.seekTo("keyM".getBytes());
-        Assert.fail("Cannot search a unsorted TFile!");
-      }
-      catch (Exception e) {
-        // noop, expecting excetions
-      }
-      finally {
-      }
-    }
-    finally {
-      scanner.close();
-      reader.close();
-    }
-  }
-
-  private void closeOutput() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-      out.close();
-      out = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
new file mode 100644
index 0000000..2f47a76
--- /dev/null
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
+
+/**
+ * A single global managed cache
+ * User can limit the cache size by num of entries, memory size (bytes) or percentage of total heap size
+ * <br>
+ * <br>
+ * Please refer to <a href="https://code.google.com/p/guava-libraries/wiki/CachesExplained">Guava Cache</a> fir details
+ * <br>
+ * <br>
+ * It keeps {@link String} as key and {@link BlockReader} as value
+ *
+ * @since 2.0.0
+ */
+public class CacheManager
+{
+  public static final int STRING_OVERHEAD = 64;
+
+  public static final int BLOCK_READER_OVERHEAD = 368;
+
+  public static final float DEFAULT_HEAP_MEMORY_PERCENTAGE = 0.25f;
+
+  private static Cache<String, BlockReader> singleCache;
+
+  private static boolean enableStats = false;
+
+  public static final Cache<String, BlockReader> buildCache(CacheBuilder builder) {
+    if (singleCache != null) {
+      singleCache.cleanUp();
+    }
+    if (enableStats) {
+      //todo: when we upgrade to a newer guava version we can use this
+      // builder.recordStats();
+    }
+    singleCache = builder.build();
+    return singleCache;
+  }
+
+  /**
+   * (Re)Create the cache by limiting the maximum entries
+   * @param concurrencyLevel
+   * @param initialCapacity
+   * @param maximunSize
+   * @return The cache.
+   */
+  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, int maximunSize){
+    CacheBuilder builder = CacheBuilder.newBuilder().
+        concurrencyLevel(concurrencyLevel).
+        initialCapacity(initialCapacity).
+        maximumSize(maximunSize);
+
+    return buildCache(builder);
+  }
+
+
+  /**
+   * (Re)Create the cache by limiting the memory(in bytes)
+   * @param concurrencyLevel
+   * @param initialCapacity
+   * @param maximumMemory
+   * @return The cache.
+   */
+  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, long maximumMemory){
+
+    CacheBuilder builder = CacheBuilder.newBuilder().
+        concurrencyLevel(concurrencyLevel).
+        initialCapacity(initialCapacity).
+        maximumWeight(maximumMemory).weigher(new KVWeigher());
+
+    return buildCache(builder);
+  }
+
+  /**
+   * (Re)Create the cache by limiting percentage of the total heap memory
+   * @param concurrencyLevel
+   * @param initialCapacity
+   * @param heapMemPercentage
+   * @return The cache.
+   */
+  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, float heapMemPercentage){
+    CacheBuilder builder = CacheBuilder.newBuilder().
+        concurrencyLevel(concurrencyLevel).
+        initialCapacity(initialCapacity).
+        maximumWeight((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * heapMemPercentage)).weigher(new KVWeigher());
+    return buildCache(builder);
+  }
+
+  public static final void createDefaultCache(){
+
+    long availableMemory = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * DEFAULT_HEAP_MEMORY_PERCENTAGE);
+    CacheBuilder<String, BlockReader> builder = CacheBuilder.newBuilder().maximumWeight(availableMemory).weigher(new KVWeigher());
+
+    singleCache = buildCache(builder);
+  }
+
+  public static final void put(String key, BlockReader blk){
+    if (singleCache == null) {
+      createDefaultCache();
+    }
+    singleCache.put(key, blk);
+  }
+
+  public static final BlockReader get(String key){
+    if (singleCache == null) {
+      return null;
+    }
+    return singleCache.getIfPresent(key);
+  }
+
+  public static final void invalidateKeys(Collection<String> keys)
+  {
+    if (singleCache != null)
+      singleCache.invalidateAll(keys);
+  }
+
+  public static final long getCacheSize() {
+    if (singleCache != null)
+      return singleCache.size();
+    return 0;
+  }
+
+  public static final class KVWeigher implements Weigher<String, BlockReader> {
+
+    @Override
+    public int weigh(String key, BlockReader value)
+    {
+      return (STRING_OVERHEAD + BLOCK_READER_OVERHEAD) +
+          key.getBytes().length +
+          value.getBlockDataInputStream().getBuf().length;
+    }
+
+  }
+
+  @VisibleForTesting
+  protected static Cache<String, BlockReader> getCache() {
+    return singleCache;
+  }
+
+  public static final void setEnableStats(boolean enable) {
+    enableStats = enable;
+  }
+
+  public static void main(String[] args)
+  {
+
+    //code to eitsmate the overhead of the instance of the key value objects
+    // it depends on hbase file
+//    System.out.println(ClassSize.estimateBase(BlockReader.class, true) +
+//        ClassSize.estimateBase(Algorithm.class, true) +
+//        ClassSize.estimateBase(RBlockState.class, true) +
+//        ClassSize.estimateBase(ReusableByteArrayInputStream.class, true) +
+//        ClassSize.estimateBase(BlockRegion.class, true));
+//
+//    System.out.println(
+//        ClassSize.estimateBase(String.class, true));
+  }
+
+}