You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:05 UTC
[6/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment
moved DTFile implementation to from contrib to lib
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
deleted file mode 100644
index 37bff4b..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import org.junit.Assert;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-public class DTFileTest
-{
- private Configuration conf;
- private Path path;
- private FileSystem fs;
- private NanoTimer timer;
- private Random rng;
- private RandomDistribution.DiscreteRNG keyLenGen;
- private KVGenerator kvGen;
-
-
- static class TestConf {
- public int minWordLen = 5;
- public int maxWordLen = 20;
- public int dictSize = 1000;
- int minKeyLen = 10;
- int maxKeyLen = 50;
- int minValLength = 100;
- int maxValLength = 200;
- int minBlockSize = 64 * 1024;
- int fsOutputBufferSize = 1;
- int fsInputBufferSize = 256 * 1024;
- long fileSize = 3 * 1024 * 1024;
- long seekCount = 1000;
- String compress = "gz";
-
- }
-
- TestConf tconf = new TestConf();
-
- public void setUp() throws IOException
- {
- conf = new Configuration();
-
- conf.setInt("tfile.fs.input.buffer.size", tconf.fsInputBufferSize);
- conf.setInt("tfile.fs.output.buffer.size", tconf.fsOutputBufferSize);
- path = new Path("tmp/dtfile");
- fs = path.getFileSystem(conf);
- timer = new NanoTimer(false);
- rng = new Random();
- keyLenGen =
- new RandomDistribution.Zipf(new Random(rng.nextLong()),
- tconf.minKeyLen, tconf.maxKeyLen, 1.2);
- RandomDistribution.DiscreteRNG valLenGen =
- new RandomDistribution.Flat(new Random(rng.nextLong()),
- tconf.minValLength, tconf.maxValLength);
- RandomDistribution.DiscreteRNG wordLenGen =
- new RandomDistribution.Flat(new Random(rng.nextLong()),
- tconf.minWordLen, tconf.maxWordLen);
- kvGen =
- new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
- tconf.dictSize);
- }
-
-
- private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
- throws IOException {
- if (fs.exists(name)) {
- fs.delete(name, true);
- }
- FSDataOutputStream fout = fs.create(name);
- return fout;
- }
-
- int tuples = 0;
-
- private void writeTFile() throws IOException
- {
-
- FSDataOutputStream fout = createFSOutput(path, fs);
- byte[] key = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(key);
- try {
- DTFile.Writer writer =
- new DTFile.Writer(fout, tconf.minBlockSize, tconf.compress, "memcmp",
- conf);
- try {
- BytesWritable tmpKey = new BytesWritable();
- BytesWritable val = new BytesWritable();
- for (long i = 0; true; ++i) {
- if (i % 1000 == 0) { // test the size for every 1000 rows.
- if (fs.getFileStatus(path).getLen() >= tconf.fileSize) {
- break;
- }
- }
- bb.clear();
- bb.putLong(i);
- kvGen.next(tmpKey, val, false);
- writer.append(key, 0, key.length, val.get(), 0, val
- .getSize());
- tuples++;
- }
- }
- finally {
- writer.close();
- }
- }
- finally {
- fout.close();
- }
-
- long fsize = fs.getFileStatus(path).getLen();
-
- System.out.println("Total tuple wrote " + tuples + " File size " + fsize / (1024.0 * 1024));
- }
-
-
-
- @Test
- public void seekDTFile() throws IOException
- {
- Random random = new Random();
- int ikey = random.nextInt(tuples);
- byte[] key = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(key);
- bb.putLong(ikey);
-
- FSDataInputStream fsdis = fs.open(path);
-
- CacheManager.setEnableStats(true);
- Assert.assertEquals("Cache Contains no block", CacheManager.getCacheSize(), 0);
-
- DTFile.Reader reader = new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
- DTFile.Reader.Scanner scanner = reader.createScanner();
-
- /* Read first key in the file */
- long numBlocks = CacheManager.getCacheSize();
- scanner.lowerBound(key);
- Assert.assertEquals("Cache contains some blocks ", numBlocks + 1, CacheManager.getCacheSize());
-
- /* Next key does not add a new block in cache, it reads directly from cache */
- // close scanner, so that it does not use its own cache.
- scanner.close();
- ikey++;
- bb.clear();
- bb.putLong(ikey);
-
- numBlocks = CacheManager.getCacheSize();
- long hit = CacheManager.getCache().stats().hitCount();
- scanner.lowerBound(key);
- Assert.assertEquals("Cache contains some blocks ", CacheManager.getCacheSize(), numBlocks);
- Assert.assertEquals("Cache hit ", CacheManager.getCache().stats().hitCount(), hit+1);
-
- /* test cache miss */
- scanner.close();
- hit = CacheManager.getCache().stats().hitCount();
- long oldmiss = CacheManager.getCache().stats().missCount();
- ikey = tuples-1;
- bb.clear();
- bb.putLong(ikey);
- numBlocks = CacheManager.getCacheSize();
- scanner.lowerBound(key);
- Assert.assertEquals("Cache contains one more blocks ", CacheManager.getCacheSize(), numBlocks + 1);
- Assert.assertEquals("No cache hit ", CacheManager.getCache().stats().hitCount(), hit);
- Assert.assertEquals("Cache miss", CacheManager.getCache().stats().missCount(), oldmiss + 1);
-
- Assert.assertEquals("Reverse lookup cache and block cache has same number of entries",
- reader.readerBCF.getCacheKeys().size(), CacheManager.getCacheSize());
- reader.close();
- Assert.assertEquals("Cache blocks are deleted on reader close ", CacheManager.getCacheSize(), 0);
- Assert.assertEquals("Size of reverse lookup cache is zero ", 0, reader.readerBCF.getCacheKeys().size());
- }
-
- @Test
- public void checkInvalidKeys()
- {
- /* invalidating non existing key do not throw exception */
- List<String> lst = new LinkedList<String>();
- lst.add("One");
- lst.add("Two");
- CacheManager.getCache().invalidateAll(lst);
- }
-
- @Before
- public void createDTfile() throws IOException
- {
- setUp();
- writeTFile();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
deleted file mode 100644
index 49fedeb..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-
-/**
- * test tfile features.
- *
- */
-public class TestDTFile extends TestCase {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
- private FileSystem fs;
- private Configuration conf;
- private static final int minBlockSize = 512;
- private static final int largeVal = 3 * 1024 * 1024;
- private static final String localFormatter = "%010d";
-
- @Override
- public void setUp() throws IOException {
- conf = new Configuration();
- fs = FileSystem.get(conf);
- }
-
- @Override
- public void tearDown() throws IOException {
- // do nothing
- }
-
- // read a key from the scanner
- public byte[] readKey(Scanner scanner) throws IOException {
- int keylen = scanner.entry().getKeyLength();
- byte[] read = new byte[keylen];
- scanner.entry().getKey(read);
- return read;
- }
-
- // read a value from the scanner
- public byte[] readValue(Scanner scanner) throws IOException {
- int valueLen = scanner.entry().getValueLength();
- byte[] read = new byte[valueLen];
- scanner.entry().getValue(read);
- return read;
- }
-
- // read a long value from the scanner
- public byte[] readLongValue(Scanner scanner, int len) throws IOException {
- DataInputStream din = scanner.entry().getValueStream();
- byte[] b = new byte[len];
- din.readFully(b);
- din.close();
- return b;
- }
-
- // write some records into the tfile
- // write them twice
- private int writeSomeRecords(Writer writer, int start, int n)
- throws IOException {
- String value = "value";
- for (int i = start; i < (start + n); i++) {
- String key = String.format(localFormatter, i);
- writer.append(key.getBytes(), (value + key).getBytes());
- writer.append(key.getBytes(), (value + key).getBytes());
- }
- return (start + n);
- }
-
- // read the records and check
- private int readAndCheckbytes(Scanner scanner, int start, int n)
- throws IOException {
- String value = "value";
- for (int i = start; i < (start + n); i++) {
- byte[] key = readKey(scanner);
- byte[] val = readValue(scanner);
- String keyStr = String.format(localFormatter, i);
- String valStr = value + keyStr;
- assertTrue("btyes for keys do not match " + keyStr + " "
- + new String(key), Arrays.equals(keyStr.getBytes(), key));
- assertTrue("bytes for vals do not match " + valStr + " "
- + new String(val), Arrays.equals(
- valStr.getBytes(), val));
- assertTrue(scanner.advance());
- key = readKey(scanner);
- val = readValue(scanner);
- assertTrue("btyes for keys do not match", Arrays.equals(
- keyStr.getBytes(), key));
- assertTrue("bytes for vals do not match", Arrays.equals(
- valStr.getBytes(), val));
- assertTrue(scanner.advance());
- }
- return (start + n);
- }
-
- // write some large records
- // write them twice
- private int writeLargeRecords(Writer writer, int start, int n)
- throws IOException {
- byte[] value = new byte[largeVal];
- for (int i = start; i < (start + n); i++) {
- String key = String.format(localFormatter, i);
- writer.append(key.getBytes(), value);
- writer.append(key.getBytes(), value);
- }
- return (start + n);
- }
-
- // read large records
- // read them twice since its duplicated
- private int readLargeRecords(Scanner scanner, int start, int n)
- throws IOException {
- for (int i = start; i < (start + n); i++) {
- byte[] key = readKey(scanner);
- String keyStr = String.format(localFormatter, i);
- assertTrue("btyes for keys do not match", Arrays.equals(
- keyStr.getBytes(), key));
- scanner.advance();
- key = readKey(scanner);
- assertTrue("btyes for keys do not match", Arrays.equals(
- keyStr.getBytes(), key));
- scanner.advance();
- }
- return (start + n);
- }
-
- // write empty keys and values
- private void writeEmptyRecords(Writer writer, int n) throws IOException {
- byte[] key = new byte[0];
- byte[] value = new byte[0];
- for (int i = 0; i < n; i++) {
- writer.append(key, value);
- }
- }
-
- // read empty keys and values
- private void readEmptyRecords(Scanner scanner, int n) throws IOException {
- byte[] key = new byte[0];
- byte[] value = new byte[0];
- byte[] readKey = null;
- byte[] readValue = null;
- for (int i = 0; i < n; i++) {
- readKey = readKey(scanner);
- readValue = readValue(scanner);
- assertTrue("failed to match keys", Arrays.equals(readKey, key));
- assertTrue("failed to match values", Arrays.equals(readValue, value));
- assertTrue("failed to advance cursor", scanner.advance());
- }
- }
-
- private int writePrepWithKnownLength(Writer writer, int start, int n)
- throws IOException {
- // get the length of the key
- String key = String.format(localFormatter, start);
- int keyLen = key.getBytes().length;
- String value = "value" + key;
- int valueLen = value.getBytes().length;
- for (int i = start; i < (start + n); i++) {
- DataOutputStream out = writer.prepareAppendKey(keyLen);
- String localKey = String.format(localFormatter, i);
- out.write(localKey.getBytes());
- out.close();
- out = writer.prepareAppendValue(valueLen);
- String localValue = "value" + localKey;
- out.write(localValue.getBytes());
- out.close();
- }
- return (start + n);
- }
-
- private int readPrepWithKnownLength(Scanner scanner, int start, int n)
- throws IOException {
- for (int i = start; i < (start + n); i++) {
- String key = String.format(localFormatter, i);
- byte[] read = readKey(scanner);
- assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
- String value = "value" + key;
- read = readValue(scanner);
- assertTrue("values not equal", Arrays.equals(value.getBytes(), read));
- scanner.advance();
- }
- return (start + n);
- }
-
- private int writePrepWithUnkownLength(Writer writer, int start, int n)
- throws IOException {
- for (int i = start; i < (start + n); i++) {
- DataOutputStream out = writer.prepareAppendKey(-1);
- String localKey = String.format(localFormatter, i);
- out.write(localKey.getBytes());
- out.close();
- String value = "value" + localKey;
- out = writer.prepareAppendValue(-1);
- out.write(value.getBytes());
- out.close();
- }
- return (start + n);
- }
-
- private int readPrepWithUnknownLength(Scanner scanner, int start, int n)
- throws IOException {
- for (int i = start; i < start; i++) {
- String key = String.format(localFormatter, i);
- byte[] read = readKey(scanner);
- assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
- try {
- read = readValue(scanner);
- assertTrue(false);
- }
- catch (IOException ie) {
- // should have thrown exception
- }
- String value = "value" + key;
- read = readLongValue(scanner, value.getBytes().length);
- assertTrue("values nto equal", Arrays.equals(read, value.getBytes()));
- scanner.advance();
- }
- return (start + n);
- }
-
- private byte[] getSomeKey(int rowId) {
- return String.format(localFormatter, rowId).getBytes();
- }
-
- private void writeRecords(Writer writer) throws IOException {
- writeEmptyRecords(writer, 10);
- int ret = writeSomeRecords(writer, 0, 100);
- ret = writeLargeRecords(writer, ret, 1);
- ret = writePrepWithKnownLength(writer, ret, 40);
- ret = writePrepWithUnkownLength(writer, ret, 50);
- writer.close();
- }
-
- private void readAllRecords(Scanner scanner) throws IOException {
- readEmptyRecords(scanner, 10);
- int ret = readAndCheckbytes(scanner, 0, 100);
- ret = readLargeRecords(scanner, ret, 1);
- ret = readPrepWithKnownLength(scanner, ret, 40);
- ret = readPrepWithUnknownLength(scanner, ret, 50);
- }
-
- private FSDataOutputStream createFSOutput(Path name) throws IOException {
- if (fs.exists(name)) fs.delete(name, true);
- FSDataOutputStream fout = fs.create(name);
- return fout;
- }
-
- /**
- * test none codecs
- */
- void basicWithSomeCodec(String codec) throws IOException {
- Path ncTFile = new Path(ROOT, "basic.tfile");
- FSDataOutputStream fout = createFSOutput(ncTFile);
- Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf);
- writeRecords(writer);
- fout.close();
- FSDataInputStream fin = fs.open(ncTFile);
- Reader reader =
- new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf);
-
- Scanner scanner = reader.createScanner();
- readAllRecords(scanner);
- scanner.seekTo(getSomeKey(50));
- assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50)));
- // read the key and see if it matches
- byte[] readKey = readKey(scanner);
- assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50),
- readKey));
-
- scanner.seekTo(new byte[0]);
- byte[] val1 = readValue(scanner);
- scanner.seekTo(new byte[0]);
- byte[] val2 = readValue(scanner);
- assertTrue(Arrays.equals(val1, val2));
-
- // check for lowerBound
- scanner.lowerBound(getSomeKey(50));
- assertTrue("locaton lookup failed", scanner.currentLocation
- .compareTo(reader.end()) < 0);
- readKey = readKey(scanner);
- assertTrue("seeked key does not match", Arrays.equals(readKey,
- getSomeKey(50)));
-
- // check for upper bound
- scanner.upperBound(getSomeKey(50));
- assertTrue("location lookup failed", scanner.currentLocation
- .compareTo(reader.end()) < 0);
- readKey = readKey(scanner);
- assertTrue("seeked key does not match", Arrays.equals(readKey,
- getSomeKey(51)));
-
- scanner.close();
- // test for a range of scanner
- scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
- readAndCheckbytes(scanner, 10, 50);
- assertFalse(scanner.advance());
- scanner.close();
- reader.close();
- fin.close();
- fs.delete(ncTFile, true);
- }
-
- // unsorted with some codec
- void unsortedWithSomeCodec(String codec) throws IOException {
- Path uTfile = new Path(ROOT, "unsorted.tfile");
- FSDataOutputStream fout = createFSOutput(uTfile);
- Writer writer = new Writer(fout, minBlockSize, codec, null, conf);
- writeRecords(writer);
- writer.close();
- fout.close();
- FSDataInputStream fin = fs.open(uTfile);
- Reader reader =
- new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf);
-
- Scanner scanner = reader.createScanner();
- readAllRecords(scanner);
- scanner.close();
- reader.close();
- fin.close();
- fs.delete(uTfile, true);
- }
-
- public void testTFileFeatures() throws IOException {
- basicWithSomeCodec("none");
- basicWithSomeCodec("gz");
- }
-
- // test unsorted t files.
- public void testUnsortedTFileFeatures() throws IOException {
- unsortedWithSomeCodec("none");
- unsortedWithSomeCodec("gz");
- }
-
- private void writeNumMetablocks(Writer writer, String compression, int n)
- throws IOException {
- for (int i = 0; i < n; i++) {
- DataOutputStream dout =
- writer.prepareMetaBlock("TfileMeta" + i, compression);
- byte[] b = ("something to test" + i).getBytes();
- dout.write(b);
- dout.close();
- }
- }
-
- private void someTestingWithMetaBlock(Writer writer, String compression)
- throws IOException {
- DataOutputStream dout = null;
- writeNumMetablocks(writer, compression, 10);
- try {
- dout = writer.prepareMetaBlock("TfileMeta1", compression);
- assertTrue(false);
- }
- catch (MetaBlockAlreadyExists me) {
- // avoid this exception
- }
- dout = writer.prepareMetaBlock("TFileMeta100", compression);
- dout.close();
- }
-
- private void readNumMetablocks(Reader reader, int n) throws IOException {
- int len = ("something to test" + 0).getBytes().length;
- for (int i = 0; i < n; i++) {
- DataInputStream din = reader.getMetaBlock("TfileMeta" + i);
- byte b[] = new byte[len];
- din.readFully(b);
- assertTrue("faield to match metadata", Arrays.equals(
- ("something to test" + i).getBytes(), b));
- din.close();
- }
- }
-
- private void someReadingWithMetaBlock(Reader reader) throws IOException {
- DataInputStream din = null;
- readNumMetablocks(reader, 10);
- try {
- din = reader.getMetaBlock("NO ONE");
- assertTrue(false);
- }
- catch (MetaBlockDoesNotExist me) {
- // should catch
- }
- din = reader.getMetaBlock("TFileMeta100");
- int read = din.read();
- assertTrue("check for status", (read == -1));
- din.close();
- }
-
- // test meta blocks for tfiles
- public void _testMetaBlocks() throws IOException {
- Path mFile = new Path(ROOT, "meta.tfile");
- FSDataOutputStream fout = createFSOutput(mFile);
- Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
- someTestingWithMetaBlock(writer, "none");
- writer.close();
- fout.close();
- FSDataInputStream fin = fs.open(mFile);
- Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
- someReadingWithMetaBlock(reader);
- fs.delete(mFile, true);
- reader.close();
- fin.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
deleted file mode 100644
index a1fa5c8..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Location;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- *
- */
-public class TestDTFileByteArrays {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
- private final static int BLOCK_SIZE = 512;
- private final static int BUF_SIZE = 64;
- private final static int K = 1024;
- protected boolean skip = false;
-
- private static final String KEY = "key";
- private static final String VALUE = "value";
-
- private FileSystem fs;
- private Configuration conf = new Configuration();
- private Path path;
- private FSDataOutputStream out;
- private Writer writer;
-
- private String compression = Compression.Algorithm.GZ.getName();
- private String comparator = "memcmp";
- private final String outputFile = getClass().getSimpleName();
-
- /*
- * pre-sampled numbers of records in one block, based on the given the
- * generated key and value strings. This is slightly different based on
- * whether or not the native libs are present.
- */
- private boolean usingNative = ZlibFactory.isNativeZlibLoaded(conf);
- private int records1stBlock = usingNative ? 5674 : 4480;
- private int records2ndBlock = usingNative ? 5574 : 4263;
-
- public void init(String compression, String comparator,
- int numRecords1stBlock, int numRecords2ndBlock) {
- init(compression, comparator);
- this.records1stBlock = numRecords1stBlock;
- this.records2ndBlock = numRecords2ndBlock;
- }
-
- public void init(String compression, String comparator) {
- this.compression = compression;
- this.comparator = comparator;
- }
-
- @Before
- public void setUp() throws IOException {
- path = new Path(ROOT, outputFile);
- fs = path.getFileSystem(conf);
- out = fs.create(path);
- writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
- }
-
- @After
- public void tearDown() throws IOException {
- if (!skip)
- fs.delete(path, true);
- }
-
- @Test
- public void testNoDataEntry() throws IOException {
- if (skip)
- return;
- closeOutput();
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.assertTrue(reader.isSorted());
- Scanner scanner = reader.createScanner();
- Assert.assertTrue(scanner.atEnd());
- scanner.close();
- reader.close();
- }
-
- @Test
- public void testOneDataEntry() throws IOException {
- if (skip)
- return;
- writeRecords(1);
- readRecords(1);
-
- checkBlockIndex(0, 0);
- readValueBeforeKey(0);
- readKeyWithoutValue(0);
- readValueWithoutKey(0);
- readKeyManyTimes(0);
- }
-
- @Test
- public void testTwoDataEntries() throws IOException {
- if (skip)
- return;
- writeRecords(2);
- readRecords(2);
- }
-
- /**
- * Fill up exactly one block.
- *
- * @throws IOException
- */
- @Test
- public void testOneBlock() throws IOException {
- if (skip)
- return;
- // just under one block
- writeRecords(records1stBlock);
- readRecords(records1stBlock);
- // last key should be in the first block (block 0)
- checkBlockIndex(records1stBlock - 1, 0);
- }
-
- /**
- * One block plus one record.
- *
- * @throws IOException
- */
- @Test
- public void testOneBlockPlusOneEntry() throws IOException {
- if (skip)
- return;
- writeRecords(records1stBlock + 1);
- readRecords(records1stBlock + 1);
- checkBlockIndex(records1stBlock - 1, 0);
- checkBlockIndex(records1stBlock, 1);
- }
-
- @Test
- public void testTwoBlocks() throws IOException {
- if (skip)
- return;
- writeRecords(records1stBlock + 5);
- readRecords(records1stBlock + 5);
- checkBlockIndex(records1stBlock + 4, 1);
- }
-
- @Test
- public void testThreeBlocks() throws IOException {
- if (skip)
- return;
- writeRecords(2 * records1stBlock + 5);
- readRecords(2 * records1stBlock + 5);
-
- checkBlockIndex(2 * records1stBlock + 4, 2);
- // 1st key in file
- readValueBeforeKey(0);
- readKeyWithoutValue(0);
- readValueWithoutKey(0);
- readKeyManyTimes(0);
- // last key in file
- readValueBeforeKey(2 * records1stBlock + 4);
- readKeyWithoutValue(2 * records1stBlock + 4);
- readValueWithoutKey(2 * records1stBlock + 4);
- readKeyManyTimes(2 * records1stBlock + 4);
-
- // 1st key in mid block, verify block indexes then read
- checkBlockIndex(records1stBlock - 1, 0);
- checkBlockIndex(records1stBlock, 1);
- readValueBeforeKey(records1stBlock);
- readKeyWithoutValue(records1stBlock);
- readValueWithoutKey(records1stBlock);
- readKeyManyTimes(records1stBlock);
-
- // last key in mid block, verify block indexes then read
- checkBlockIndex(records1stBlock + records2ndBlock
- - 1, 1);
- checkBlockIndex(records1stBlock + records2ndBlock, 2);
- readValueBeforeKey(records1stBlock
- + records2ndBlock - 1);
- readKeyWithoutValue(records1stBlock
- + records2ndBlock - 1);
- readValueWithoutKey(records1stBlock
- + records2ndBlock - 1);
- readKeyManyTimes(records1stBlock + records2ndBlock
- - 1);
-
- // mid in mid block
- readValueBeforeKey(records1stBlock + 10);
- readKeyWithoutValue(records1stBlock + 10);
- readValueWithoutKey(records1stBlock + 10);
- readKeyManyTimes(records1stBlock + 10);
- }
-
- Location locate(Scanner scanner, byte[] key) throws IOException {
- if (scanner.seekTo(key) == true) {
- return scanner.currentLocation;
- }
- return scanner.endLocation;
- }
-
- @Test
- public void testLocate() throws IOException {
- if (skip)
- return;
- writeRecords(3 * records1stBlock);
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- locate(scanner, composeSortedKey(KEY, 2).getBytes());
- locate(scanner, composeSortedKey(KEY, records1stBlock - 1).getBytes());
- locate(scanner, composeSortedKey(KEY, records1stBlock).getBytes());
- Location locX = locate(scanner, "keyX".getBytes());
- Assert.assertEquals(scanner.endLocation, locX);
- scanner.close();
- reader.close();
- }
-
- @Test
- public void testFailureWriterNotClosed() throws IOException {
- if (skip)
- return;
- Reader reader = null;
- try {
- reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.fail("Cannot read before closing the writer.");
- } catch (IOException e) {
- // noop, expecting exceptions
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
-
- @Test
- public void testFailureWriteMetaBlocksWithSameName() throws IOException {
- if (skip)
- return;
- writer.append("keyX".getBytes(), "valueX".getBytes());
-
- // create a new metablock
- DataOutputStream outMeta =
- writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
- outMeta.write(123);
- outMeta.write("foo".getBytes());
- outMeta.close();
- // add the same metablock
- try {
- writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
- Assert.fail("Cannot create metablocks with the same name.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- @Test
- public void testFailureGetNonExistentMetaBlock() throws IOException {
- if (skip)
- return;
- writer.append("keyX".getBytes(), "valueX".getBytes());
-
- // create a new metablock
- DataOutputStream outMeta =
- writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
- outMeta.write(123);
- outMeta.write("foo".getBytes());
- outMeta.close();
- closeOutput();
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- DataInputStream mb = reader.getMetaBlock("testX");
- Assert.assertNotNull(mb);
- mb.close();
- try {
- DataInputStream mbBad = reader.getMetaBlock("testY");
- Assert.fail("Error on handling non-existent metablocks.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- reader.close();
- }
-
- @Test
- public void testFailureWriteRecordAfterMetaBlock() throws IOException {
- if (skip)
- return;
- // write a key/value first
- writer.append("keyX".getBytes(), "valueX".getBytes());
- // create a new metablock
- DataOutputStream outMeta =
- writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
- outMeta.write(123);
- outMeta.write("dummy".getBytes());
- outMeta.close();
- // add more key/value
- try {
- writer.append("keyY".getBytes(), "valueY".getBytes());
- Assert.fail("Cannot add key/value after start adding meta blocks.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- @Test
- public void testFailureReadValueManyTimes() throws IOException {
- if (skip)
- return;
- writeRecords(5);
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
-
- byte[] vbuf = new byte[BUF_SIZE];
- int vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
- try {
- scanner.entry().getValue(vbuf);
- Assert.fail("Cannot get the value mlutiple times.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
-
- scanner.close();
- reader.close();
- }
-
- @Test
- public void testFailureBadCompressionCodec() throws IOException {
- if (skip)
- return;
- closeOutput();
- out = fs.create(path);
- try {
- writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
- Assert.fail("Error on handling invalid compression codecs.");
- } catch (Exception e) {
- // noop, expecting exceptions
- // e.printStackTrace();
- }
- }
-
- @Test
- public void testFailureOpenEmptyFile() throws IOException {
- if (skip)
- return;
- closeOutput();
- // create an absolutely empty file
- path = new Path(fs.getWorkingDirectory(), outputFile);
- out = fs.create(path);
- out.close();
- try {
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.fail("Error on handling empty files.");
- } catch (EOFException e) {
- // noop, expecting exceptions
- }
- }
-
- @Test
- public void testFailureOpenRandomFile() throws IOException {
- if (skip)
- return;
- closeOutput();
- // create an random file
- path = new Path(fs.getWorkingDirectory(), outputFile);
- out = fs.create(path);
- Random rand = new Random();
- byte[] buf = new byte[K];
- // fill with > 1MB data
- for (int nx = 0; nx < K + 2; nx++) {
- rand.nextBytes(buf);
- out.write(buf);
- }
- out.close();
- try {
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Assert.fail("Error on handling random files.");
- } catch (IOException e) {
- // noop, expecting exceptions
- }
- }
-
- @Test
- public void testFailureKeyLongerThan64K() throws IOException {
- if (skip)
- return;
- byte[] buf = new byte[64 * K + 1];
- Random rand = new Random();
- rand.nextBytes(buf);
- try {
- writer.append(buf, "valueX".getBytes());
- } catch (IndexOutOfBoundsException e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- @Test
- public void testFailureOutOfOrderKeys() throws IOException {
- if (skip)
- return;
- try {
- writer.append("keyM".getBytes(), "valueM".getBytes());
- writer.append("keyA".getBytes(), "valueA".getBytes());
- Assert.fail("Error on handling out of order keys.");
- } catch (Exception e) {
- // noop, expecting exceptions
- // e.printStackTrace();
- }
-
- closeOutput();
- }
-
- @Test
- public void testFailureNegativeOffset() throws IOException {
- if (skip)
- return;
- try {
- writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
- Assert.fail("Error on handling negative offset.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- @Test
- public void testFailureNegativeOffset_2() throws IOException {
- if (skip)
- return;
- closeOutput();
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- try {
- scanner.lowerBound("keyX".getBytes(), -1, 4);
- Assert.fail("Error on handling negative offset.");
- } catch (Exception e) {
- // noop, expecting exceptions
- } finally {
- reader.close();
- scanner.close();
- }
- closeOutput();
- }
-
- @Test
- public void testFailureNegativeLength() throws IOException {
- if (skip)
- return;
- try {
- writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
- Assert.fail("Error on handling negative length.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- @Test
- public void testFailureNegativeLength_2() throws IOException {
- if (skip)
- return;
- closeOutput();
-
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- try {
- scanner.lowerBound("keyX".getBytes(), 0, -1);
- Assert.fail("Error on handling negative length.");
- } catch (Exception e) {
- // noop, expecting exceptions
- } finally {
- scanner.close();
- reader.close();
- }
- closeOutput();
- }
-
- @Test
- public void testFailureNegativeLength_3() throws IOException {
- if (skip)
- return;
- writeRecords(3);
-
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- try {
- // test negative array offset
- try {
- scanner.seekTo("keyY".getBytes(), -1, 4);
- Assert.fail("Failed to handle negative offset.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
-
- // test negative array length
- try {
- scanner.seekTo("keyY".getBytes(), 0, -2);
- Assert.fail("Failed to handle negative key length.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- } finally {
- reader.close();
- scanner.close();
- }
- }
-
- @Test
- public void testFailureCompressionNotWorking() throws IOException {
- if (skip)
- return;
- long rawDataSize = writeRecords(10 * records1stBlock, false);
- if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
- Assert.assertTrue(out.getPos() < rawDataSize);
- }
- closeOutput();
- }
-
- @Test
- public void testFailureFileWriteNotAt0Position() throws IOException {
- if (skip)
- return;
- closeOutput();
- out = fs.create(path);
- out.write(123);
-
- try {
- writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
- Assert.fail("Failed to catch file write not at position 0.");
- } catch (Exception e) {
- // noop, expecting exceptions
- }
- closeOutput();
- }
-
- private long writeRecords(int count) throws IOException {
- return writeRecords(count, true);
- }
-
- private long writeRecords(int count, boolean close) throws IOException {
- long rawDataSize = writeRecords(writer, count);
- if (close) {
- closeOutput();
- }
- return rawDataSize;
- }
-
- static long writeRecords(Writer writer, int count) throws IOException {
- long rawDataSize = 0;
- int nx;
- for (nx = 0; nx < count; nx++) {
- byte[] key = composeSortedKey(KEY, nx).getBytes();
- byte[] value = (VALUE + nx).getBytes();
- writer.append(key, value);
- rawDataSize +=
- WritableUtils.getVIntSize(key.length) + key.length
- + WritableUtils.getVIntSize(value.length) + value.length;
- }
- return rawDataSize;
- }
-
- /**
- * Insert some leading 0's in front of the value, to make the keys sorted.
- *
- * @param prefix prefix
- * @param value value
- * @return sorted key
- */
- static String composeSortedKey(String prefix, int value) {
- return String.format("%s%010d", prefix, value);
- }
-
- private void readRecords(int count) throws IOException {
- readRecords(fs, path, count, conf);
- }
-
- static void readRecords(FileSystem fs, Path path, int count,
- Configuration conf) throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
-
- try {
- for (int nx = 0; nx < count; nx++, scanner.advance()) {
- Assert.assertFalse(scanner.atEnd());
- // Assert.assertTrue(scanner.next());
-
- byte[] kbuf = new byte[BUF_SIZE];
- int klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
- nx));
-
- byte[] vbuf = new byte[BUF_SIZE];
- int vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
- }
-
- Assert.assertTrue(scanner.atEnd());
- Assert.assertFalse(scanner.advance());
- } finally {
- scanner.close();
- reader.close();
- }
- }
-
- private void checkBlockIndex(int recordIndex, int blockIndexExpected) throws IOException {
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner = reader.createScanner();
- scanner.seekTo(composeSortedKey(KEY, recordIndex).getBytes());
- Assert.assertEquals(blockIndexExpected, scanner.currentLocation
- .getBlockIndex());
- scanner.close();
- reader.close();
- }
-
- private void readValueBeforeKey(int recordIndex)
- throws IOException {
- Reader reader =
- new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner =
- reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
- .getBytes(), null);
-
- try {
- byte[] vbuf = new byte[BUF_SIZE];
- int vlen = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf);
- Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
-
- byte[] kbuf = new byte[BUF_SIZE];
- int klen = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf);
- Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
- recordIndex));
- } finally {
- scanner.close();
- reader.close();
- }
- }
-
- private void readKeyWithoutValue(int recordIndex)
- throws IOException {
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
- Scanner scanner =
- reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
- .getBytes(), null);
-
- try {
- // read the indexed key
- byte[] kbuf1 = new byte[BUF_SIZE];
- int klen1 = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf1);
- Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
- recordIndex));
-
- if (scanner.advance() && !scanner.atEnd()) {
- // read the next key following the indexed
- byte[] kbuf2 = new byte[BUF_SIZE];
- int klen2 = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf2);
- Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
- recordIndex + 1));
- }
- } finally {
- scanner.close();
- reader.close();
- }
- }
-
- private void readValueWithoutKey(int recordIndex)
- throws IOException {
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-
- Scanner scanner =
- reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
- .getBytes(), null);
-
- byte[] vbuf1 = new byte[BUF_SIZE];
- int vlen1 = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf1);
- Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
-
- if (scanner.advance() && !scanner.atEnd()) {
- byte[] vbuf2 = new byte[BUF_SIZE];
- int vlen2 = scanner.entry().getValueLength();
- scanner.entry().getValue(vbuf2);
- Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
- + (recordIndex + 1));
- }
-
- scanner.close();
- reader.close();
- }
-
- private void readKeyManyTimes(int recordIndex) throws IOException {
- Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
-
- Scanner scanner =
- reader.createScannerByKey(composeSortedKey(KEY, recordIndex)
- .getBytes(), null);
-
- // read the indexed key
- byte[] kbuf1 = new byte[BUF_SIZE];
- int klen1 = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf1);
- Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
- recordIndex));
-
- klen1 = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf1);
- Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
- recordIndex));
-
- klen1 = scanner.entry().getKeyLength();
- scanner.entry().getKey(kbuf1);
- Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
- recordIndex));
-
- scanner.close();
- reader.close();
- }
-
- private void closeOutput() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- }
- if (out != null) {
- out.close();
- out = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
deleted file mode 100644
index c313813..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestTFileComparator2 {
- private static final String ROOT = System.getProperty("test.build.data",
- "/tmp/tfile-test");
- private static final String name = "test-tfile-comparator2";
- private final static int BLOCK_SIZE = 512;
- private static final String VALUE = "value";
- private static final String jClassLongWritableComparator = "jclass:"
- + LongWritable.Comparator.class.getName();
- private static final long NENTRY = 10000;
-
- private static long cube(long n) {
- return n*n*n;
- }
-
- private static String buildValue(long i) {
- return String.format("%s-%d", VALUE, i);
- }
-
- @Test
- public void testSortedLongWritable() throws IOException {
- Configuration conf = new Configuration();
- Path path = new Path(ROOT, name);
- FileSystem fs = path.getFileSystem(conf);
- FSDataOutputStream out = fs.create(path);
- try {
- DTFile.Writer writer = new Writer(out, BLOCK_SIZE, "gz",
- jClassLongWritableComparator, conf);
- try {
- LongWritable key = new LongWritable(0);
- for (long i=0; i<NENTRY; ++i) {
- key.set(cube(i-NENTRY/2));
- DataOutputStream dos = writer.prepareAppendKey(-1);
- try {
- key.write(dos);
- } finally {
- dos.close();
- }
- dos = writer.prepareAppendValue(-1);
- try {
- dos.write(buildValue(i).getBytes());
- } finally {
- dos.close();
- }
- }
- } finally {
- writer.close();
- }
- } finally {
- out.close();
- }
-
- FSDataInputStream in = fs.open(path);
- try {
- DTFile.Reader reader = new DTFile.Reader(in, fs.getFileStatus(path)
- .getLen(), conf);
- try {
- DTFile.Reader.Scanner scanner = reader.createScanner();
- long i=0;
- BytesWritable value = new BytesWritable();
- for (; !scanner.atEnd(); scanner.advance()) {
- scanner.entry().getValue(value);
- assertEquals(buildValue(i), new String(value.getBytes(), 0, value
- .getLength()));
- ++i;
- }
- } finally {
- reader.close();
- }
- } finally {
- in.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
deleted file mode 100644
index 0a10468..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.junit.Assert;
-
-/**
- *
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- *
- */
-public class TestTFileComparators extends TestCase {
- private static String ROOT =
- System.getProperty("test.build.data", "/tmp/tfile-test");
-
- private final static int BLOCK_SIZE = 512;
- private FileSystem fs;
- private Configuration conf;
- private Path path;
- private FSDataOutputStream out;
- private Writer writer;
-
- private String compression = Compression.Algorithm.GZ.getName();
- private String outputFile = "TFileTestComparators";
- /*
- * pre-sampled numbers of records in one block, based on the given the
- * generated key and value strings
- */
- // private int records1stBlock = 4314;
- // private int records2ndBlock = 4108;
- private int records1stBlock = 4480;
- private int records2ndBlock = 4263;
-
- @Override
- public void setUp() throws IOException {
- conf = new Configuration();
- path = new Path(ROOT, outputFile);
- fs = path.getFileSystem(conf);
- out = fs.create(path);
- }
-
- @Override
- public void tearDown() throws IOException {
- fs.delete(path, true);
- }
-
- // bad comparator format
- public void testFailureBadComparatorNames() throws IOException {
- try {
- writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf);
- Assert.fail("Failed to catch unsupported comparator names");
- }
- catch (Exception e) {
- // noop, expecting exceptions
- e.printStackTrace();
- }
- }
-
- // jclass that doesn't exist
- public void testFailureBadJClassNames() throws IOException {
- try {
- writer =
- new Writer(out, BLOCK_SIZE, compression,
- "jclass: some.non.existence.clazz", conf);
- Assert.fail("Failed to catch unsupported comparator names");
- }
- catch (Exception e) {
- // noop, expecting exceptions
- e.printStackTrace();
- }
- }
-
- // class exists but not a RawComparator
- public void testFailureBadJClasses() throws IOException {
- try {
- writer =
- new Writer(out, BLOCK_SIZE, compression,
- "jclass:org.apache.hadoop.io.file.tfile.Chunk", conf);
- Assert.fail("Failed to catch unsupported comparator names");
- }
- catch (Exception e) {
- // noop, expecting exceptions
- e.printStackTrace();
- }
- }
-
- private void closeOutput() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- }
- if (out != null) {
- out.close();
- out = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
deleted file mode 100644
index 301cffc..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- *
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- *
- */
-
-public class TestTFileJClassComparatorByteArrays extends TestDTFileByteArrays {
- /**
- * Test non-compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- init(Compression.Algorithm.GZ.getName(),
- "jclass: org.apache.hadoop.io.file.tfile.MyComparator");
- super.setUp();
- }
-}
-
-class MyComparator implements RawComparator<byte[]>, Serializable {
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
deleted file mode 100644
index 20cff9e..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-public class TestTFileLzoCodecsByteArrays extends TestDTFileByteArrays {
- /**
- * Test LZO compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- skip = !(Algorithm.LZO.isSupported());
- if (skip) {
- System.out.println("Skipped");
- }
-
- // TODO: sample the generated key/value records, and put the numbers below
- init(Compression.Algorithm.LZO.getName(), "memcmp", 2605, 2558);
- if (!skip)
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
deleted file mode 100644
index 7c6581d..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-public class TestTFileLzoCodecsStreams extends TestTFileStreams {
- /**
- * Test LZO compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- skip = !(Algorithm.LZO.isSupported());
- if (skip) {
- System.out.println("Skipped");
- }
- init(Compression.Algorithm.LZO.getName(), "memcmp");
- if (!skip)
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
deleted file mode 100644
index c304743..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-public class TestTFileNoneCodecsByteArrays extends TestDTFileByteArrays {
- /**
- * Test non-compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- init(Compression.Algorithm.NONE.getName(), "memcmp", 24, 24);
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
deleted file mode 100644
index 31e3cad..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-/**
- *
- * Byte arrays test case class using GZ compression codec, base class of none
- * and LZO compression classes.
- *
- */
-
-public class TestTFileNoneCodecsJClassComparatorByteArrays extends TestDTFileByteArrays {
- /**
- * Test non-compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- init(Compression.Algorithm.NONE.getName(),
- "jclass: org.apache.hadoop.io.file.tfile.MyComparator", 24, 24);
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
deleted file mode 100644
index 06d086b..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-
-public class TestTFileNoneCodecsStreams extends TestTFileStreams {
- /**
- * Test non-compression codec, using the same test cases as in the ByteArrays.
- */
- @Override
- public void setUp() throws IOException {
- init(Compression.Algorithm.NONE.getName(), "memcmp");
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
deleted file mode 100644
index 9f6b3ce..0000000
--- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Writer;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-
-/**
- * test the performance for seek.
- *
- */
-public class TestTFileSeek extends TestCase {
- private MyOptions options;
- private Configuration conf;
- private Path path;
- private FileSystem fs;
- private NanoTimer timer;
- private Random rng;
- private DiscreteRNG keyLenGen;
- private KVGenerator kvGen;
-
- @Override
- public void setUp() throws IOException {
- if (options == null) {
- options = new MyOptions(new String[0]);
- }
-
- conf = new Configuration();
- conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
- conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
- path = new Path(new Path(options.rootDir), options.file);
- fs = path.getFileSystem(conf);
- timer = new NanoTimer(false);
- rng = new Random(options.seed);
- keyLenGen =
- new RandomDistribution.Zipf(new Random(rng.nextLong()),
- options.minKeyLen, options.maxKeyLen, 1.2);
- DiscreteRNG valLenGen =
- new RandomDistribution.Flat(new Random(rng.nextLong()),
- options.minValLength, options.maxValLength);
- DiscreteRNG wordLenGen =
- new RandomDistribution.Flat(new Random(rng.nextLong()),
- options.minWordLen, options.maxWordLen);
- kvGen =
- new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
- options.dictSize);
- }
-
- @Override
- public void tearDown() throws IOException {
- fs.delete(path, true);
- }
-
- private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
- throws IOException {
- if (fs.exists(name)) {
- fs.delete(name, true);
- }
- FSDataOutputStream fout = fs.create(name);
- return fout;
- }
-
- private void createTFile() throws IOException {
- long totalBytes = 0;
- FSDataOutputStream fout = createFSOutput(path, fs);
- try {
- Writer writer =
- new Writer(fout, options.minBlockSize, options.compress, "memcmp",
- conf);
- try {
- BytesWritable key = new BytesWritable();
- BytesWritable val = new BytesWritable();
- timer.start();
- for (long i = 0; true; ++i) {
- if (i % 1000 == 0) { // test the size for every 1000 rows.
- if (fs.getFileStatus(path).getLen() >= options.fileSize) {
- break;
- }
- }
- kvGen.next(key, val, false);
- writer.append(key.get(), 0, key.getSize(), val.get(), 0, val
- .getSize());
- totalBytes += key.getSize();
- totalBytes += val.getSize();
- }
- timer.stop();
- }
- finally {
- writer.close();
- }
- }
- finally {
- fout.close();
- }
- double duration = (double)timer.read()/1000; // in us.
- long fsize = fs.getFileStatus(path).getLen();
-
- System.out.printf(
- "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
- timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
- / duration);
- System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
- timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
- }
-
- public void seekTFile() throws IOException {
- int miss = 0;
- long totalBytes = 0;
- FSDataInputStream fsdis = fs.open(path);
- Reader reader =
- new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
- KeySampler kSampler =
- new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
- keyLenGen);
- Scanner scanner = reader.createScanner();
- BytesWritable key = new BytesWritable();
- BytesWritable val = new BytesWritable();
- timer.reset();
- timer.start();
- for (int i = 0; i < options.seekCount; ++i) {
- kSampler.next(key);
- scanner.lowerBound(key.get(), 0, key.getSize());
- if (!scanner.atEnd()) {
- scanner.entry().get(key, val);
- totalBytes += key.getSize();
- totalBytes += val.getSize();
- }
- else {
- ++miss;
- }
- }
- timer.stop();
- double duration = (double) timer.read() / 1000; // in us.
- System.out.printf(
- "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
- timer.toString(), NanoTimer.nanoTimeToString(timer.read()
- / options.seekCount), options.seekCount - miss, miss,
- (double) totalBytes / 1024 / (options.seekCount - miss));
-
- }
-
- public void testSeeks() throws IOException {
- String[] supported = TFile.getSupportedCompressionAlgorithms();
- boolean proceed = false;
- for (String c : supported) {
- if (c.equals(options.compress)) {
- proceed = true;
- break;
- }
- }
-
- if (!proceed) {
- System.out.println("Skipped for " + options.compress);
- return;
- }
-
- if (options.doCreate()) {
- createTFile();
- }
-
- if (options.doRead()) {
- seekTFile();
- }
- }
-
- private static class IntegerRange {
- private final int from, to;
-
- public IntegerRange(int from, int to) {
- this.from = from;
- this.to = to;
- }
-
- public static IntegerRange parse(String s) throws ParseException {
- StringTokenizer st = new StringTokenizer(s, " \t,");
- if (st.countTokens() != 2) {
- throw new ParseException("Bad integer specification: " + s);
- }
- int from = Integer.parseInt(st.nextToken());
- int to = Integer.parseInt(st.nextToken());
- return new IntegerRange(from, to);
- }
-
- public int from() {
- return from;
- }
-
- public int to() {
- return to;
- }
- }
-
- private static class MyOptions {
- // hard coded constants
- int dictSize = 1000;
- int minWordLen = 5;
- int maxWordLen = 20;
- int osInputBufferSize = 64 * 1024;
- int osOutputBufferSize = 64 * 1024;
- int fsInputBufferSizeNone = 0;
- int fsInputBufferSizeLzo = 0;
- int fsInputBufferSizeGz = 0;
- int fsOutputBufferSizeNone = 1;
- int fsOutputBufferSizeLzo = 1;
- int fsOutputBufferSizeGz = 1;
-
- String rootDir =
- System.getProperty("test.build.data", "/tmp/tfile-test");
- String file = "TestTFileSeek";
- String compress = "gz";
- int minKeyLen = 10;
- int maxKeyLen = 50;
- int minValLength = 100;
- int maxValLength = 200;
- int minBlockSize = 64 * 1024;
- int fsOutputBufferSize = 1;
- int fsInputBufferSize = 0;
- long fileSize = 3 * 1024 * 1024;
- long seekCount = 1000;
- long seed;
-
- static final int OP_CREATE = 1;
- static final int OP_READ = 2;
- int op = OP_CREATE | OP_READ;
-
- boolean proceed = false;
-
- public MyOptions(String[] args) {
- seed = System.nanoTime();
-
- try {
- Options opts = buildOptions();
- CommandLineParser parser = new GnuParser();
- CommandLine line = parser.parse(opts, args, true);
- processOptions(line, opts);
- validateOptions();
- }
- catch (ParseException e) {
- System.out.println(e.getMessage());
- System.out.println("Try \"--help\" option for details.");
- setStopProceed();
- }
- }
-
- public boolean proceed() {
- return proceed;
- }
-
- private Options buildOptions() {
- Option compress =
- OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]")
- .hasArg().withDescription("compression scheme").create('c');
-
- Option fileSize =
- OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
- .hasArg().withDescription("target size of the file (in MB).")
- .create('s');
-
- Option fsInputBufferSz =
- OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
- .hasArg().withDescription(
- "size of the file system input buffer (in bytes).").create(
- 'i');
-
- Option fsOutputBufferSize =
- OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
- .hasArg().withDescription(
- "size of the file system output buffer (in bytes).").create(
- 'o');
-
- Option keyLen =
- OptionBuilder
- .withLongOpt("key-length")
- .withArgName("min,max")
- .hasArg()
- .withDescription(
- "the length range of the key (in bytes)")
- .create('k');
-
- Option valueLen =
- OptionBuilder
- .withLongOpt("value-length")
- .withArgName("min,max")
- .hasArg()
- .withDescription(
- "the length range of the value (in bytes)")
- .create('v');
-
- Option blockSz =
- OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
- .withDescription("minimum block size (in KB)").create('b');
-
- Option seed =
- OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg()
- .withDescription("specify the seed").create('S');
-
- Option operation =
- OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
- .withDescription(
- "action: seek-only, create-only, seek-after-create").create(
- 'x');
-
- Option rootDir =
- OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
- .withDescription(
- "specify root directory where files will be created.")
- .create('r');
-
- Option file =
- OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
- .withDescription("specify the file name to be created or read.")
- .create('f');
-
- Option seekCount =
- OptionBuilder
- .withLongOpt("seek")
- .withArgName("count")
- .hasArg()
- .withDescription(
- "specify how many seek operations we perform (requires -x r or -x rw.")
- .create('n');
-
- Option help =
- OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
- "show this screen").create("h");
-
- return new Options().addOption(compress).addOption(fileSize).addOption(
- fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
- .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption(
- operation).addOption(seekCount).addOption(file).addOption(help);
-
- }
-
- private void processOptions(CommandLine line, Options opts)
- throws ParseException {
- // --help -h and --version -V must be processed first.
- if (line.hasOption('h')) {
- HelpFormatter formatter = new HelpFormatter();
- System.out.println("TFile and SeqFile benchmark.");
- System.out.println();
- formatter.printHelp(100,
- "java ... TestTFileSeqFileComparison [options]",
- "\nSupported options:", opts, "");
- return;
- }
-
- if (line.hasOption('c')) {
- compress = line.getOptionValue('c');
- }
-
- if (line.hasOption('d')) {
- dictSize = Integer.parseInt(line.getOptionValue('d'));
- }
-
- if (line.hasOption('s')) {
- fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
- }
-
- if (line.hasOption('i')) {
- fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
- }
-
- if (line.hasOption('o')) {
- fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
- }
-
- if (line.hasOption('n')) {
- seekCount = Integer.parseInt(line.getOptionValue('n'));
- }
-
- if (line.hasOption('k')) {
- IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
- minKeyLen = ir.from();
- maxKeyLen = ir.to();
- }
-
- if (line.hasOption('v')) {
- IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
- minValLength = ir.from();
- maxValLength = ir.to();
- }
-
- if (line.hasOption('b')) {
- minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
- }
-
- if (line.hasOption('r')) {
- rootDir = line.getOptionValue('r');
- }
-
- if (line.hasOption('f')) {
- file = line.getOptionValue('f');
- }
-
- if (line.hasOption('S')) {
- seed = Long.parseLong(line.getOptionValue('S'));
- }
-
- if (line.hasOption('x')) {
- String strOp = line.getOptionValue('x');
- if (strOp.equals("r")) {
- op = OP_READ;
- }
- else if (strOp.equals("w")) {
- op = OP_CREATE;
- }
- else if (strOp.equals("rw")) {
- op = OP_CREATE | OP_READ;
- }
- else {
- throw new ParseException("Unknown action specifier: " + strOp);
- }
- }
-
- proceed = true;
- }
-
- private void validateOptions() throws ParseException {
- if (!compress.equals("none") && !compress.equals("lzo")
- && !compress.equals("gz")) {
- throw new ParseException("Unknown compression scheme: " + compress);
- }
-
- if (minKeyLen >= maxKeyLen) {
- throw new ParseException(
- "Max key length must be greater than min key length.");
- }
-
- if (minValLength >= maxValLength) {
- throw new ParseException(
- "Max value length must be greater than min value length.");
- }
-
- if (minWordLen >= maxWordLen) {
- throw new ParseException(
- "Max word length must be greater than min word length.");
- }
- return;
- }
-
- private void setStopProceed() {
- proceed = false;
- }
-
- public boolean doCreate() {
- return (op & OP_CREATE) != 0;
- }
-
- public boolean doRead() {
- return (op & OP_READ) != 0;
- }
- }
-
- public static void main(String[] argv) throws IOException {
- TestTFileSeek testCase = new TestTFileSeek();
- MyOptions options = new MyOptions(argv);
-
- if (options.proceed == false) {
- return;
- }
-
- testCase.options = options;
- testCase.setUp();
- testCase.testSeeks();
- testCase.tearDown();
- }
-}