You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:04 UTC
[5/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment
moved DTFile implementation to from contrib to lib
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
deleted file mode 100644
index e513ccd..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.util.Time;
-
-public class TestTFileSeqFileComparison extends TestCase {
- MyOptions options;
-
- private FileSystem fs;
- private Configuration conf;
- private long startTimeEpoch;
- private long finishTimeEpoch;
- private DateFormat formatter;
- byte[][] dictionary;
-
- @Override
- public void setUp() throws IOException {
- if (options == null) {
- options = new MyOptions(new String[0]);
- }
-
- conf = new Configuration();
- conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
- conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
- Path path = new Path(options.rootDir);
- fs = path.getFileSystem(conf);
- formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- setUpDictionary();
- }
-
- private void setUpDictionary() {
- Random rng = new Random();
- dictionary = new byte[options.dictSize][];
- for (int i = 0; i < options.dictSize; ++i) {
- int len =
- rng.nextInt(options.maxWordLen - options.minWordLen)
- + options.minWordLen;
- dictionary[i] = new byte[len];
- rng.nextBytes(dictionary[i]);
- }
- }
-
- @Override
- public void tearDown() throws IOException {
- // do nothing
- }
-
- public void startTime() throws IOException {
- startTimeEpoch = Time.now();
- System.out.println(formatTime() + " Started timing.");
- }
-
- public void stopTime() throws IOException {
- finishTimeEpoch = Time.now();
- System.out.println(formatTime() + " Stopped timing.");
- }
-
- public long getIntervalMillis() throws IOException {
- return finishTimeEpoch - startTimeEpoch;
- }
-
- public void printlnWithTimestamp(String message) throws IOException {
- System.out.println(formatTime() + " " + message);
- }
-
- /*
- * Format millis into minutes and seconds.
- */
- public String formatTime(long milis) {
- return formatter.format(milis);
- }
-
- public String formatTime() {
- return formatTime(Time.now());
- }
-
- private interface KVAppendable {
- public void append(BytesWritable key, BytesWritable value)
- throws IOException;
-
- public void close() throws IOException;
- }
-
- private interface KVReadable {
- public byte[] getKey();
-
- public byte[] getValue();
-
- public int getKeyLength();
-
- public int getValueLength();
-
- public boolean next() throws IOException;
-
- public void close() throws IOException;
- }
-
- static class TFileAppendable implements KVAppendable {
- private FSDataOutputStream fsdos;
- private TFile.Writer writer;
-
- public TFileAppendable(FileSystem fs, Path path, String compress,
- int minBlkSize, int osBufferSize, Configuration conf)
- throws IOException {
- this.fsdos = fs.create(path, true, osBufferSize);
- this.writer = new TFile.Writer(fsdos, minBlkSize, compress, null, conf);
- }
-
- @Override
- public void append(BytesWritable key, BytesWritable value)
- throws IOException {
- writer.append(key.get(), 0, key.getSize(), value.get(), 0, value
- .getSize());
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- fsdos.close();
- }
- }
-
- static class TFileReadable implements KVReadable {
- private FSDataInputStream fsdis;
- private DTFile.Reader reader;
- private DTFile.Reader.Scanner scanner;
- private byte[] keyBuffer;
- private int keyLength;
- private byte[] valueBuffer;
- private int valueLength;
-
- public TFileReadable(FileSystem fs, Path path, int osBufferSize,
- Configuration conf) throws IOException {
- this.fsdis = fs.open(path, osBufferSize);
- this.reader =
- new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
- this.scanner = reader.createScanner();
- keyBuffer = new byte[32];
- valueBuffer = new byte[32];
- }
-
- private void checkKeyBuffer(int size) {
- if (size <= keyBuffer.length) {
- return;
- }
- keyBuffer =
- new byte[Math.max(2 * keyBuffer.length, 2 * size - keyBuffer.length)];
- }
-
- private void checkValueBuffer(int size) {
- if (size <= valueBuffer.length) {
- return;
- }
- valueBuffer =
- new byte[Math.max(2 * valueBuffer.length, 2 * size
- - valueBuffer.length)];
- }
-
- @Override
- public byte[] getKey() {
- return keyBuffer;
- }
-
- @Override
- public int getKeyLength() {
- return keyLength;
- }
-
- @Override
- public byte[] getValue() {
- return valueBuffer;
- }
-
- @Override
- public int getValueLength() {
- return valueLength;
- }
-
- @Override
- public boolean next() throws IOException {
- if (scanner.atEnd()) return false;
- Entry entry = scanner.entry();
- keyLength = entry.getKeyLength();
- checkKeyBuffer(keyLength);
- entry.getKey(keyBuffer);
- valueLength = entry.getValueLength();
- checkValueBuffer(valueLength);
- entry.getValue(valueBuffer);
- scanner.advance();
- return true;
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- reader.close();
- fsdis.close();
- }
- }
-
- static class SeqFileAppendable implements KVAppendable {
- private FSDataOutputStream fsdos;
- private SequenceFile.Writer writer;
-
- public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
- String compress, int minBlkSize) throws IOException {
- Configuration conf = new Configuration();
- conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
- true);
-
- CompressionCodec codec = null;
- if ("lzo".equals(compress)) {
- codec = Compression.Algorithm.LZO.getCodec();
- }
- else if ("gz".equals(compress)) {
- codec = Compression.Algorithm.GZ.getCodec();
- }
- else if (!"none".equals(compress))
- throw new IOException("Codec not supported.");
-
- this.fsdos = fs.create(path, true, osBufferSize);
-
- if (!"none".equals(compress)) {
- writer =
- SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
- BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
- }
- else {
- writer =
- SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
- BytesWritable.class, SequenceFile.CompressionType.NONE, null);
- }
- }
-
- @Override
- public void append(BytesWritable key, BytesWritable value)
- throws IOException {
- writer.append(key, value);
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- fsdos.close();
- }
- }
-
- static class SeqFileReadable implements KVReadable {
- private SequenceFile.Reader reader;
- private BytesWritable key;
- private BytesWritable value;
-
- public SeqFileReadable(FileSystem fs, Path path, int osBufferSize)
- throws IOException {
- Configuration conf = new Configuration();
- conf.setInt("io.file.buffer.size", osBufferSize);
- reader = new SequenceFile.Reader(fs, path, conf);
- key = new BytesWritable();
- value = new BytesWritable();
- }
-
- @Override
- public byte[] getKey() {
- return key.get();
- }
-
- @Override
- public int getKeyLength() {
- return key.getSize();
- }
-
- @Override
- public byte[] getValue() {
- return value.get();
- }
-
- @Override
- public int getValueLength() {
- return value.getSize();
- }
-
- @Override
- public boolean next() throws IOException {
- return reader.next(key, value);
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
- }
-
- private void reportStats(Path path, long totalBytes) throws IOException {
- long duration = getIntervalMillis();
- long fsize = fs.getFileStatus(path).getLen();
- printlnWithTimestamp(String.format(
- "Duration: %dms...total size: %.2fMB...raw thrpt: %.2fMB/s", duration,
- (double) totalBytes / 1024 / 1024, (double) totalBytes / duration
- * 1000 / 1024 / 1024));
- printlnWithTimestamp(String.format(
- "Compressed size: %.2fMB...compressed thrpt: %.2fMB/s.",
- (double) fsize / 1024 / 1024, (double) fsize / duration * 1000 / 1024
- / 1024));
- }
-
- private void fillBuffer(Random rng, BytesWritable bw, byte[] tmp, int len) {
- int n = 0;
- while (n < len) {
- byte[] word = dictionary[rng.nextInt(dictionary.length)];
- int l = Math.min(word.length, len - n);
- System.arraycopy(word, 0, tmp, n, l);
- n += l;
- }
- bw.set(tmp, 0, len);
- }
-
- private void timeWrite(Path path, KVAppendable appendable, int baseKlen,
- int baseVlen, long fileSize) throws IOException {
- int maxKlen = baseKlen * 2;
- int maxVlen = baseVlen * 2;
- BytesWritable key = new BytesWritable();
- BytesWritable value = new BytesWritable();
- byte[] keyBuffer = new byte[maxKlen];
- byte[] valueBuffer = new byte[maxVlen];
- Random rng = new Random(options.seed);
- long totalBytes = 0;
- printlnWithTimestamp("Start writing: " + path.getName() + "...");
- startTime();
-
- for (long i = 0; true; ++i) {
- if (i % 1000 == 0) { // test the size for every 1000 rows.
- if (fs.getFileStatus(path).getLen() >= fileSize) {
- break;
- }
- }
- int klen = rng.nextInt(baseKlen) + baseKlen;
- int vlen = rng.nextInt(baseVlen) + baseVlen;
- fillBuffer(rng, key, keyBuffer, klen);
- fillBuffer(rng, value, valueBuffer, vlen);
- key.set(keyBuffer, 0, klen);
- value.set(valueBuffer, 0, vlen);
- appendable.append(key, value);
- totalBytes += klen;
- totalBytes += vlen;
- }
- stopTime();
- appendable.close();
- reportStats(path, totalBytes);
- }
-
- private void timeRead(Path path, KVReadable readable) throws IOException {
- printlnWithTimestamp("Start reading: " + path.getName() + "...");
- long totalBytes = 0;
- startTime();
- for (; readable.next();) {
- totalBytes += readable.getKeyLength();
- totalBytes += readable.getValueLength();
- }
- stopTime();
- readable.close();
- reportStats(path, totalBytes);
- }
-
- private void createTFile(String parameters, String compress)
- throws IOException {
- System.out.println("=== TFile: Creation (" + parameters + ") === ");
- Path path = new Path(options.rootDir, "TFile.Performance");
- KVAppendable appendable =
- new TFileAppendable(fs, path, compress, options.minBlockSize,
- options.osOutputBufferSize, conf);
- timeWrite(path, appendable, options.keyLength, options.valueLength,
- options.fileSize);
- }
-
- private void readTFile(String parameters, boolean delFile) throws IOException {
- System.out.println("=== TFile: Reading (" + parameters + ") === ");
- {
- Path path = new Path(options.rootDir, "TFile.Performance");
- KVReadable readable =
- new TFileReadable(fs, path, options.osInputBufferSize, conf);
- timeRead(path, readable);
- if (delFile) {
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
- }
- }
-
- private void createSeqFile(String parameters, String compress)
- throws IOException {
- System.out.println("=== SeqFile: Creation (" + parameters + ") === ");
- Path path = new Path(options.rootDir, "SeqFile.Performance");
- KVAppendable appendable =
- new SeqFileAppendable(fs, path, options.osOutputBufferSize, compress,
- options.minBlockSize);
- timeWrite(path, appendable, options.keyLength, options.valueLength,
- options.fileSize);
- }
-
- private void readSeqFile(String parameters, boolean delFile)
- throws IOException {
- System.out.println("=== SeqFile: Reading (" + parameters + ") === ");
- Path path = new Path(options.rootDir, "SeqFile.Performance");
- KVReadable readable =
- new SeqFileReadable(fs, path, options.osInputBufferSize);
- timeRead(path, readable);
- if (delFile) {
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
- }
-
- private void compareRun(String compress) throws IOException {
- String[] supported = TFile.getSupportedCompressionAlgorithms();
- boolean proceed = false;
- for (String c : supported) {
- if (c.equals(compress)) {
- proceed = true;
- break;
- }
- }
-
- if (!proceed) {
- System.out.println("Skipped for " + compress);
- return;
- }
-
- options.compress = compress;
- String parameters = parameters2String(options);
- createSeqFile(parameters, compress);
- readSeqFile(parameters, true);
- createTFile(parameters, compress);
- readTFile(parameters, true);
- createTFile(parameters, compress);
- readTFile(parameters, true);
- createSeqFile(parameters, compress);
- readSeqFile(parameters, true);
- }
-
- public void testRunComparisons() throws IOException {
- String[] compresses = new String[] { "none", "lzo", "gz" };
- for (String compress : compresses) {
- if (compress.equals("none")) {
- conf
- .setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeNone);
- conf.setInt("tfile.fs.output.buffer.size",
- options.fsOutputBufferSizeNone);
- }
- else if (compress.equals("lzo")) {
- conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeLzo);
- conf.setInt("tfile.fs.output.buffer.size",
- options.fsOutputBufferSizeLzo);
- }
- else {
- conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeGz);
- conf
- .setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSizeGz);
- }
- compareRun(compress);
- }
- }
-
- private static String parameters2String(MyOptions options) {
- return String
- .format(
- "KLEN: %d-%d... VLEN: %d-%d...MinBlkSize: %.2fKB...Target Size: %.2fMB...Compression: ...%s",
- options.keyLength, options.keyLength * 2, options.valueLength,
- options.valueLength * 2, (double) options.minBlockSize / 1024,
- (double) options.fileSize / 1024 / 1024, options.compress);
- }
-
- private static class MyOptions {
- String rootDir =
- System
- .getProperty("test.build.data", "/tmp/tfile-test");
- String compress = "gz";
- String format = "tfile";
- int dictSize = 1000;
- int minWordLen = 5;
- int maxWordLen = 20;
- int keyLength = 50;
- int valueLength = 100;
- int minBlockSize = 256 * 1024;
- int fsOutputBufferSize = 1;
- int fsInputBufferSize = 0;
- // special variable only for unit testing.
- int fsInputBufferSizeNone = 0;
- int fsInputBufferSizeGz = 0;
- int fsInputBufferSizeLzo = 0;
- int fsOutputBufferSizeNone = 1;
- int fsOutputBufferSizeGz = 1;
- int fsOutputBufferSizeLzo = 1;
-
- // un-exposed parameters.
- int osInputBufferSize = 64 * 1024;
- int osOutputBufferSize = 64 * 1024;
-
- long fileSize = 3 * 1024 * 1024;
- long seed;
-
- static final int OP_CREATE = 1;
- static final int OP_READ = 2;
- int op = OP_READ;
-
- boolean proceed = false;
-
- public MyOptions(String[] args) {
- seed = System.nanoTime();
-
- try {
- Options opts = buildOptions();
- CommandLineParser parser = new GnuParser();
- CommandLine line = parser.parse(opts, args, true);
- processOptions(line, opts);
- validateOptions();
- }
- catch (ParseException e) {
- System.out.println(e.getMessage());
- System.out.println("Try \"--help\" option for details.");
- setStopProceed();
- }
- }
-
- public boolean proceed() {
- return proceed;
- }
-
- private Options buildOptions() {
- Option compress =
- OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
- .hasArg().withDescription("compression scheme").create('c');
-
- Option ditSize =
- OptionBuilder.withLongOpt("dict").withArgName("size").hasArg()
- .withDescription("number of dictionary entries").create('d');
-
- Option fileSize =
- OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
- .hasArg().withDescription("target size of the file (in MB).")
- .create('s');
-
- Option format =
- OptionBuilder.withLongOpt("format").withArgName("[tfile|seqfile]")
- .hasArg().withDescription("choose TFile or SeqFile").create('f');
-
- Option fsInputBufferSz =
- OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
- .hasArg().withDescription(
- "size of the file system input buffer (in bytes).").create(
- 'i');
-
- Option fsOutputBufferSize =
- OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
- .hasArg().withDescription(
- "size of the file system output buffer (in bytes).").create(
- 'o');
-
- Option keyLen =
- OptionBuilder
- .withLongOpt("key-length")
- .withArgName("length")
- .hasArg()
- .withDescription(
- "base length of the key (in bytes), actual length varies in [base, 2*base)")
- .create('k');
-
- Option valueLen =
- OptionBuilder
- .withLongOpt("value-length")
- .withArgName("length")
- .hasArg()
- .withDescription(
- "base length of the value (in bytes), actual length varies in [base, 2*base)")
- .create('v');
-
- Option wordLen =
- OptionBuilder.withLongOpt("word-length").withArgName("min,max")
- .hasArg().withDescription(
- "range of dictionary word length (in bytes)").create('w');
-
- Option blockSz =
- OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
- .withDescription("minimum block size (in KB)").create('b');
-
- Option seed =
- OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
- .withDescription("specify the seed").create('S');
-
- Option operation =
- OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
- .withDescription(
- "action: read-only, create-only, read-after-create").create(
- 'x');
-
- Option rootDir =
- OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
- .withDescription(
- "specify root directory where files will be created.")
- .create('r');
-
- Option help =
- OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
- "show this screen").create("h");
-
- return new Options().addOption(compress).addOption(ditSize).addOption(
- fileSize).addOption(format).addOption(fsInputBufferSz).addOption(
- fsOutputBufferSize).addOption(keyLen).addOption(wordLen).addOption(
- blockSz).addOption(rootDir).addOption(valueLen).addOption(operation)
- .addOption(help);
-
- }
-
- private void processOptions(CommandLine line, Options opts)
- throws ParseException {
- // --help -h and --version -V must be processed first.
- if (line.hasOption('h')) {
- HelpFormatter formatter = new HelpFormatter();
- System.out.println("TFile and SeqFile benchmark.");
- System.out.println();
- formatter.printHelp(100,
- "java ... TestTFileSeqFileComparison [options]",
- "\nSupported options:", opts, "");
- return;
- }
-
- if (line.hasOption('c')) {
- compress = line.getOptionValue('c');
- }
-
- if (line.hasOption('d')) {
- dictSize = Integer.parseInt(line.getOptionValue('d'));
- }
-
- if (line.hasOption('s')) {
- fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
- }
-
- if (line.hasOption('f')) {
- format = line.getOptionValue('f');
- }
-
- if (line.hasOption('i')) {
- fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
- }
-
- if (line.hasOption('o')) {
- fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
- }
-
- if (line.hasOption('k')) {
- keyLength = Integer.parseInt(line.getOptionValue('k'));
- }
-
- if (line.hasOption('v')) {
- valueLength = Integer.parseInt(line.getOptionValue('v'));
- }
-
- if (line.hasOption('b')) {
- minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
- }
-
- if (line.hasOption('r')) {
- rootDir = line.getOptionValue('r');
- }
-
- if (line.hasOption('S')) {
- seed = Long.parseLong(line.getOptionValue('S'));
- }
-
- if (line.hasOption('w')) {
- String min_max = line.getOptionValue('w');
- StringTokenizer st = new StringTokenizer(min_max, " \t,");
- if (st.countTokens() != 2) {
- throw new ParseException("Bad word length specification: " + min_max);
- }
- minWordLen = Integer.parseInt(st.nextToken());
- maxWordLen = Integer.parseInt(st.nextToken());
- }
-
- if (line.hasOption('x')) {
- String strOp = line.getOptionValue('x');
- if (strOp.equals("r")) {
- op = OP_READ;
- }
- else if (strOp.equals("w")) {
- op = OP_CREATE;
- }
- else if (strOp.equals("rw")) {
- op = OP_CREATE | OP_READ;
- }
- else {
- throw new ParseException("Unknown action specifier: " + strOp);
- }
- }
-
- proceed = true;
- }
-
- private void validateOptions() throws ParseException {
- if (!compress.equals("none") && !compress.equals("lzo")
- && !compress.equals("gz")) {
- throw new ParseException("Unknown compression scheme: " + compress);
- }
-
- if (!format.equals("tfile") && !format.equals("seqfile")) {
- throw new ParseException("Unknown file format: " + format);
- }
-
- if (minWordLen >= maxWordLen) {
- throw new ParseException(
- "Max word length must be greater than min word length.");
- }
- return;
- }
-
- private void setStopProceed() {
- proceed = false;
- }
-
- public boolean doCreate() {
- return (op & OP_CREATE) != 0;
- }
-
- public boolean doRead() {
- return (op & OP_READ) != 0;
- }
- }
-
- public static void main(String[] args) throws IOException {
- TestTFileSeqFileComparison testCase = new TestTFileSeqFileComparison();
- MyOptions options = new MyOptions(args);
- if (options.proceed == false) {
- return;
- }
- testCase.options = options;
- String parameters = parameters2String(options);
-
- testCase.setUp();
- if (testCase.options.format.equals("tfile")) {
- if (options.doCreate()) {
- testCase.createTFile(parameters, options.compress);
- }
- if (options.doRead()) {
- testCase.readTFile(parameters, options.doCreate());
- }
- }
- else {
- if (options.doCreate()) {
- testCase.createSeqFile(parameters, options.compress);
- }
- if (options.doRead()) {
- testCase.readSeqFile(parameters, options.doCreate());
- }
- }
- testCase.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
deleted file mode 100644
index aad563d..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-public class TestTFileSplit extends TestCase {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
-
- private final static int BLOCK_SIZE = 64 * 1024;
-
- private static final String KEY = "key";
- private static final String VALUE = "value";
-
- private FileSystem fs;
- private Configuration conf;
- private Path path;
- private Random random = new Random();
-
- private String comparator = "memcmp";
- private String outputFile = "TestTFileSplit";
-
- void createFile(int count, String compress) throws IOException {
- conf = new Configuration();
- path = new Path(ROOT, outputFile + "." + compress);
- fs = path.getFileSystem(conf);
- FSDataOutputStream out = fs.create(path);
- Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf);
-
- int nx;
- for (nx = 0; nx < count; nx++) {
- byte[] key = composeSortedKey(KEY, count, nx).getBytes();
- byte[] value = (VALUE + nx).getBytes();
- writer.append(key, value);
- }
- writer.close();
- out.close();
- }
-
- void readFile() throws IOException {
- long fileLength = fs.getFileStatus(path).getLen();
- int numSplit = 10;
- long splitSize = fileLength / numSplit + 1;
-
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- long offset = 0;
- long rowCount = 0;
- BytesWritable key, value;
- for (int i = 0; i < numSplit; ++i, offset += splitSize) {
- Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
- int count = 0;
- key = new BytesWritable();
- value = new BytesWritable();
- while (!scanner.atEnd()) {
- scanner.entry().get(key, value);
- ++count;
- scanner.advance();
- }
- scanner.close();
- Assert.assertTrue(count > 0);
- rowCount += count;
- }
- Assert.assertEquals(rowCount, reader.getEntryCount());
- reader.close();
- }
-
- /* Similar to readFile(), tests the scanner created
- * by record numbers rather than the offsets.
- */
- void readRowSplits(int numSplits) throws IOException {
-
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-
- long totalRecords = reader.getEntryCount();
- for (int i=0; i<numSplits; i++) {
- long startRec = i*totalRecords/numSplits;
- long endRec = (i+1)*totalRecords/numSplits;
- if (i == numSplits-1) {
- endRec = totalRecords;
- }
- Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
- int count = 0;
- BytesWritable key = new BytesWritable();
- BytesWritable value = new BytesWritable();
- long x=startRec;
- while (!scanner.atEnd()) {
- assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
- scanner.entry().get(key, value);
- ++count;
- assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
- scanner.advance();
- ++x;
- }
- scanner.close();
- Assert.assertTrue(count == (endRec - startRec));
- }
- // make sure specifying range at the end gives zero records.
- Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
- Assert.assertTrue(scanner.atEnd());
- }
-
- static String composeSortedKey(String prefix, int total, int value) {
- return String.format("%s%010d", prefix, value);
- }
-
- void checkRecNums() throws IOException {
- long fileLen = fs.getFileStatus(path).getLen();
- Reader reader = new Reader(fs.open(path), fileLen, conf);
- long totalRecs = reader.getEntryCount();
- long begin = random.nextLong() % (totalRecs / 2);
- if (begin < 0)
- begin += (totalRecs / 2);
- long end = random.nextLong() % (totalRecs / 2);
- if (end < 0)
- end += (totalRecs / 2);
- end += (totalRecs / 2) + 1;
-
- assertEquals("RecNum for offset=0 should be 0", 0, reader
- .getRecordNumNear(0));
- for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
- assertEquals("RecNum for offset>=fileLen should be total entries",
- totalRecs, reader.getRecordNumNear(x));
- }
-
- for (long i = 0; i < 100; ++i) {
- assertEquals("Locaton to RecNum conversion not symmetric", i, reader
- .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
- }
-
- for (long i = 1; i < 100; ++i) {
- long x = totalRecs - i;
- assertEquals("Locaton to RecNum conversion not symmetric", x, reader
- .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
- }
-
- for (long i = begin; i < end; ++i) {
- assertEquals("Locaton to RecNum conversion not symmetric", i, reader
- .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
- }
-
- for (int i = 0; i < 1000; ++i) {
- long x = random.nextLong() % totalRecs;
- if (x < 0) x += totalRecs;
- assertEquals("Locaton to RecNum conversion not symmetric", x, reader
- .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
- }
- }
-
- public void testSplit() throws IOException {
- System.out.println("testSplit");
- createFile(100000, Compression.Algorithm.NONE.getName());
- checkRecNums();
- readFile();
- readRowSplits(10);
- fs.delete(path, true);
- createFile(500000, Compression.Algorithm.GZ.getName());
- checkRecNums();
- readFile();
- readRowSplits(83);
- fs.delete(path, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
deleted file mode 100644
index 2e0506c..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-/**
- *
- * Streaming interfaces test case class using GZ compression codec, base class
- * of none and LZO compression classes.
- *
- */
-
-public class TestTFileStreams extends TestCase {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
-
- private final static int BLOCK_SIZE = 512;
- private final static int K = 1024;
- private final static int M = K * K;
- protected boolean skip = false;
- private FileSystem fs;
- private Configuration conf;
- private Path path;
- private FSDataOutputStream out;
- Writer writer;
-
- private String compression = Compression.Algorithm.GZ.getName();
- private String comparator = "memcmp";
- private final String outputFile = getClass().getSimpleName();
-
- public void init(String compression, String comparator) {
- this.compression = compression;
- this.comparator = comparator;
- }
-
- @Override
- public void setUp() throws IOException {
- conf = new Configuration();
- path = new Path(ROOT, outputFile);
- fs = path.getFileSystem(conf);
- out = fs.create(path);
- writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
- }
-
- @Override
- public void tearDown() throws IOException {
- if (!skip) {
- try {
- closeOutput();
- } catch (Exception e) {
- // no-op
- }
- fs.delete(path, true);
- }
- }
-
- public void testNoEntry() throws IOException {
- if (skip)
- return;
- closeOutput();
- TestDTFileByteArrays.readRecords(fs, path, 0, conf);
- }
-
- public void testOneEntryKnownLength() throws IOException {
- if (skip)
- return;
- writeRecords(1, true, true);
-
- TestDTFileByteArrays.readRecords(fs, path, 1, conf);
- }
-
- public void testOneEntryUnknownLength() throws IOException {
- if (skip)
- return;
- writeRecords(1, false, false);
-
- // TODO: will throw exception at getValueLength, it's inconsistent though;
- // getKeyLength returns a value correctly, though initial length is -1
- TestDTFileByteArrays.readRecords(fs, path, 1, conf);
- }
-
- // known key length, unknown value length
- public void testOneEntryMixedLengths1() throws IOException {
- if (skip)
- return;
- writeRecords(1, true, false);
-
- TestDTFileByteArrays.readRecords(fs, path, 1, conf);
- }
-
- // unknown key length, known value length
- public void testOneEntryMixedLengths2() throws IOException {
- if (skip)
- return;
- writeRecords(1, false, true);
-
- TestDTFileByteArrays.readRecords(fs, path, 1, conf);
- }
-
- public void testTwoEntriesKnownLength() throws IOException {
- if (skip)
- return;
- writeRecords(2, true, true);
-
- TestDTFileByteArrays.readRecords(fs, path, 2, conf);
- }
-
- // Negative test
- public void testFailureAddKeyWithoutValue() throws IOException {
- if (skip)
- return;
- DataOutputStream dos = writer.prepareAppendKey(-1);
- dos.write("key0".getBytes());
- try {
- closeOutput();
- fail("Cannot add only a key without a value. ");
- }
- catch (IllegalStateException e) {
- // noop, expecting an exception
- }
- }
-
- public void testFailureAddValueWithoutKey() throws IOException {
- if (skip)
- return;
- DataOutputStream outValue = null;
- try {
- outValue = writer.prepareAppendValue(6);
- outValue.write("value0".getBytes());
- fail("Cannot add a value without adding key first. ");
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- finally {
- if (outValue != null) {
- outValue.close();
- }
- }
- }
-
- public void testFailureOneEntryKnownLength() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(2);
- try {
- outKey.write("key0".getBytes());
- fail("Specified key length mismatched the actual key length.");
- }
- catch (IOException e) {
- // noop, expecting an exception
- }
-
- DataOutputStream outValue = null;
- try {
- outValue = writer.prepareAppendValue(6);
- outValue.write("value0".getBytes());
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- }
-
- public void testFailureKeyTooLong() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(2);
- try {
- outKey.write("key0".getBytes());
- outKey.close();
- Assert.fail("Key is longer than requested.");
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- finally {
- }
- }
-
- public void testFailureKeyTooShort() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(4);
- outKey.write("key0".getBytes());
- outKey.close();
- DataOutputStream outValue = writer.prepareAppendValue(15);
- try {
- outValue.write("value0".getBytes());
- outValue.close();
- Assert.fail("Value is shorter than expected.");
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- finally {
- }
- }
-
- public void testFailureValueTooLong() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(4);
- outKey.write("key0".getBytes());
- outKey.close();
- DataOutputStream outValue = writer.prepareAppendValue(3);
- try {
- outValue.write("value0".getBytes());
- outValue.close();
- Assert.fail("Value is longer than expected.");
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
-
- try {
- outKey.close();
- outKey.close();
- }
- catch (Exception e) {
- Assert.fail("Second or more close() should have no effect.");
- }
- }
-
- public void testFailureValueTooShort() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(8);
- try {
- outKey.write("key0".getBytes());
- outKey.close();
- Assert.fail("Key is shorter than expected.");
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- finally {
- }
- }
-
- public void testFailureCloseKeyStreamManyTimesInWriter() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(4);
- try {
- outKey.write("key0".getBytes());
- outKey.close();
- }
- catch (Exception e) {
- // noop, expecting an exception
- }
- finally {
- try {
- outKey.close();
- }
- catch (Exception e) {
- // no-op
- }
- }
- outKey.close();
- outKey.close();
- Assert.assertTrue("Multiple close should have no effect.", true);
- }
-
- public void testFailureKeyLongerThan64K() throws IOException {
- if (skip)
- return;
- try {
- DataOutputStream outKey = writer.prepareAppendKey(64 * K + 1);
- Assert.fail("Failed to handle key longer than 64K.");
- }
- catch (IndexOutOfBoundsException e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- public void testFailureKeyLongerThan64K_2() throws IOException {
- if (skip)
- return;
- DataOutputStream outKey = writer.prepareAppendKey(-1);
- try {
- byte[] buf = new byte[K];
- Random rand = new Random();
- for (int nx = 0; nx < K + 2; nx++) {
- rand.nextBytes(buf);
- outKey.write(buf);
- }
- outKey.close();
- Assert.fail("Failed to handle key longer than 64K.");
- }
- catch (EOFException e) {
- // noop, expecting exceptions
- }
- finally {
- try {
- closeOutput();
- }
- catch (Exception e) {
- // no-op
- }
- }
- }
-
- public void testFailureNegativeOffset() throws IOException {
- if (skip)
- return;
- writeRecords(2, true, true);
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- byte[] buf = new byte[K];
- try {
- scanner.entry().getKey(buf, -1);
- Assert.fail("Failed to handle key negative offset.");
- }
- catch (Exception e) {
- // noop, expecting exceptions
- }
- finally {
- }
- scanner.close();
- reader.close();
- }
-
- /**
- * Verify that the compressed data size is less than raw data size.
- *
- * @throws IOException
- */
- public void testFailureCompressionNotWorking() throws IOException {
- if (skip)
- return;
- long rawDataSize = writeRecords(10000, false, false, false);
- if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
- Assert.assertTrue(out.getPos() < rawDataSize);
- }
- closeOutput();
- }
-
- public void testFailureCompressionNotWorking2() throws IOException {
- if (skip)
- return;
- long rawDataSize = writeRecords(10000, true, true, false);
- if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
- Assert.assertTrue(out.getPos() < rawDataSize);
- }
- closeOutput();
- }
-
- private long writeRecords(int count, boolean knownKeyLength,
- boolean knownValueLength, boolean close) throws IOException {
- long rawDataSize = 0;
- for (int nx = 0; nx < count; nx++) {
- String key = TestDTFileByteArrays.composeSortedKey("key", nx);
- DataOutputStream outKey =
- writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
- outKey.write(key.getBytes());
- outKey.close();
- String value = "value" + nx;
- DataOutputStream outValue =
- writer.prepareAppendValue(knownValueLength ? value.length() : -1);
- outValue.write(value.getBytes());
- outValue.close();
- rawDataSize +=
- WritableUtils.getVIntSize(key.getBytes().length)
- + key.getBytes().length
- + WritableUtils.getVIntSize(value.getBytes().length)
- + value.getBytes().length;
- }
- if (close) {
- closeOutput();
- }
- return rawDataSize;
- }
-
- private long writeRecords(int count, boolean knownKeyLength,
- boolean knownValueLength) throws IOException {
- return writeRecords(count, knownKeyLength, knownValueLength, true);
- }
-
- private void closeOutput() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- }
- if (out != null) {
- out.close();
- out = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
deleted file mode 100644
index a58f649..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.Assert;
-
-public class TestTFileUnsortedByteArrays extends TestCase {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
-
-
- private final static int BLOCK_SIZE = 512;
- private final static int BUF_SIZE = 64;
-
- private FileSystem fs;
- private Configuration conf;
- private Path path;
- private FSDataOutputStream out;
- private Writer writer;
-
- private String compression = Compression.Algorithm.GZ.getName();
- private String outputFile = "TFileTestUnsorted";
- /*
- * pre-sampled numbers of records in one block, based on the given the
- * generated key and value strings
- */
- private int records1stBlock = 4314;
- private int records2ndBlock = 4108;
-
- public void init(String compression, String outputFile,
- int numRecords1stBlock, int numRecords2ndBlock) {
- this.compression = compression;
- this.outputFile = outputFile;
- this.records1stBlock = numRecords1stBlock;
- this.records2ndBlock = numRecords2ndBlock;
- }
-
- @Override
- public void setUp() throws IOException {
- conf = new Configuration();
- path = new Path(ROOT, outputFile);
- fs = path.getFileSystem(conf);
- out = fs.create(path);
- writer = new Writer(out, BLOCK_SIZE, compression, null, conf);
- writer.append("keyZ".getBytes(), "valueZ".getBytes());
- writer.append("keyM".getBytes(), "valueM".getBytes());
- writer.append("keyN".getBytes(), "valueN".getBytes());
- writer.append("keyA".getBytes(), "valueA".getBytes());
- closeOutput();
- }
-
- @Override
- public void tearDown() throws IOException {
- fs.delete(path, true);
- }
-
- // we still can scan records in an unsorted TFile
- public void testFailureScannerWithKeys() throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.assertFalse(reader.isSorted());
- Assert.assertEquals((int) reader.getEntryCount(), 4);
-
- try {
- Scanner scanner =
- reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
- Assert
- .fail("Failed to catch creating scanner with keys on unsorted file.");
- }
- catch (RuntimeException e) {
- }
- finally {
- reader.close();
- }
- }
-
- // we still can scan records in an unsorted TFile
- public void testScan() throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.assertFalse(reader.isSorted());
- Assert.assertEquals((int) reader.getEntryCount(), 4);
-
- Scanner scanner = reader.createScanner();
-
- try {
-
- // read key and value
- byte[] kbuf = new byte[BUF_SIZE];
- int klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
-
- byte[] vbuf = new byte[BUF_SIZE];
- int vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
-
- scanner.advance();
-
- // now try get value first
- vbuf = new byte[BUF_SIZE];
- vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
-
- kbuf = new byte[BUF_SIZE];
- klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
- }
- finally {
- scanner.close();
- reader.close();
- }
- }
-
- // we still can scan records in an unsorted TFile
- public void testScanRange() throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.assertFalse(reader.isSorted());
- Assert.assertEquals((int) reader.getEntryCount(), 4);
-
- Scanner scanner = reader.createScanner();
-
- try {
-
- // read key and value
- byte[] kbuf = new byte[BUF_SIZE];
- int klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), "keyZ");
-
- byte[] vbuf = new byte[BUF_SIZE];
- int vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ");
-
- scanner.advance();
-
- // now try get value first
- vbuf = new byte[BUF_SIZE];
- vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), "valueM");
-
- kbuf = new byte[BUF_SIZE];
- klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), "keyM");
- }
- finally {
- scanner.close();
- reader.close();
- }
- }
-
- public void testFailureSeek() throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
-
- try {
- // can't find ceil
- try {
- scanner.lowerBound("keyN".getBytes());
- Assert.fail("Cannot search in a unsorted TFile!");
- }
- catch (Exception e) {
- // noop, expecting excetions
- }
- finally {
- }
-
- // can't find higher
- try {
- scanner.upperBound("keyA".getBytes());
- Assert.fail("Cannot search higher in a unsorted TFile!");
- }
- catch (Exception e) {
- // noop, expecting excetions
- }
- finally {
- }
-
- // can't seek
- try {
- scanner.seekTo("keyM".getBytes());
- Assert.fail("Cannot search a unsorted TFile!");
- }
- catch (Exception e) {
- // noop, expecting excetions
- }
- finally {
- }
- }
- finally {
- scanner.close();
- reader.close();
- }
- }
-
- private void closeOutput() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- out.close();
- out = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
new file mode 100644
index 0000000..2f47a76
--- /dev/null
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
+
+/**
+ * A single global managed cache
+ * User can limit the cache size by num of entries, memory size (bytes) or percentage of total heap size
+ * <br>
+ * <br>
+ * Please refer to <a href="https://code.google.com/p/guava-libraries/wiki/CachesExplained">Guava Cache</a> fir details
+ * <br>
+ * <br>
+ * It keeps {@link String} as key and {@link BlockReader} as value
+ *
+ * @since 2.0.0
+ */
+public class CacheManager
+{
+ public static final int STRING_OVERHEAD = 64;
+
+ public static final int BLOCK_READER_OVERHEAD = 368;
+
+ public static final float DEFAULT_HEAP_MEMORY_PERCENTAGE = 0.25f;
+
+ private static Cache<String, BlockReader> singleCache;
+
+ private static boolean enableStats = false;
+
+ public static final Cache<String, BlockReader> buildCache(CacheBuilder builder) {
+ if (singleCache != null) {
+ singleCache.cleanUp();
+ }
+ if (enableStats) {
+ //todo: when we upgrade to a newer guava version we can use this
+ // builder.recordStats();
+ }
+ singleCache = builder.build();
+ return singleCache;
+ }
+
+ /**
+ * (Re)Create the cache by limiting the maximum entries
+ * @param concurrencyLevel
+ * @param initialCapacity
+ * @param maximunSize
+ * @return The cache.
+ */
+ public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, int maximunSize){
+ CacheBuilder builder = CacheBuilder.newBuilder().
+ concurrencyLevel(concurrencyLevel).
+ initialCapacity(initialCapacity).
+ maximumSize(maximunSize);
+
+ return buildCache(builder);
+ }
+
+
+ /**
+ * (Re)Create the cache by limiting the memory(in bytes)
+ * @param concurrencyLevel
+ * @param initialCapacity
+ * @param maximumMemory
+ * @return The cache.
+ */
+ public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, long maximumMemory){
+
+ CacheBuilder builder = CacheBuilder.newBuilder().
+ concurrencyLevel(concurrencyLevel).
+ initialCapacity(initialCapacity).
+ maximumWeight(maximumMemory).weigher(new KVWeigher());
+
+ return buildCache(builder);
+ }
+
+ /**
+ * (Re)Create the cache by limiting percentage of the total heap memory
+ * @param concurrencyLevel
+ * @param initialCapacity
+ * @param heapMemPercentage
+ * @return The cache.
+ */
+ public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, float heapMemPercentage){
+ CacheBuilder builder = CacheBuilder.newBuilder().
+ concurrencyLevel(concurrencyLevel).
+ initialCapacity(initialCapacity).
+ maximumWeight((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * heapMemPercentage)).weigher(new KVWeigher());
+ return buildCache(builder);
+ }
+
+ public static final void createDefaultCache(){
+
+ long availableMemory = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * DEFAULT_HEAP_MEMORY_PERCENTAGE);
+ CacheBuilder<String, BlockReader> builder = CacheBuilder.newBuilder().maximumWeight(availableMemory).weigher(new KVWeigher());
+
+ singleCache = buildCache(builder);
+ }
+
+ public static final void put(String key, BlockReader blk){
+ if (singleCache == null) {
+ createDefaultCache();
+ }
+ singleCache.put(key, blk);
+ }
+
+ public static final BlockReader get(String key){
+ if (singleCache == null) {
+ return null;
+ }
+ return singleCache.getIfPresent(key);
+ }
+
+ public static final void invalidateKeys(Collection<String> keys)
+ {
+ if (singleCache != null)
+ singleCache.invalidateAll(keys);
+ }
+
+ public static final long getCacheSize() {
+ if (singleCache != null)
+ return singleCache.size();
+ return 0;
+ }
+
+ public static final class KVWeigher implements Weigher<String, BlockReader> {
+
+ @Override
+ public int weigh(String key, BlockReader value)
+ {
+ return (STRING_OVERHEAD + BLOCK_READER_OVERHEAD) +
+ key.getBytes().length +
+ value.getBlockDataInputStream().getBuf().length;
+ }
+
+ }
+
+ @VisibleForTesting
+ protected static Cache<String, BlockReader> getCache() {
+ return singleCache;
+ }
+
+ public static final void setEnableStats(boolean enable) {
+ enableStats = enable;
+ }
+
+ public static void main(String[] args)
+ {
+
+ //code to eitsmate the overhead of the instance of the key value objects
+ // it depends on hbase file
+// System.out.println(ClassSize.estimateBase(BlockReader.class, true) +
+// ClassSize.estimateBase(Algorithm.class, true) +
+// ClassSize.estimateBase(RBlockState.class, true) +
+// ClassSize.estimateBase(ReusableByteArrayInputStream.class, true) +
+// ClassSize.estimateBase(BlockRegion.class, true));
+//
+// System.out.println(
+// ClassSize.estimateBase(String.class, true));
+ }
+
+}