You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:38 UTC
[24/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
deleted file mode 100644
index c3d4992..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class TestStorageManager {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestStorageManager";
- StorageManager sm = null;
- private Path testDir;
- private FileSystem fs;
-
- @Before
- public void setUp() throws Exception {
- conf = new TajoConf();
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- sm = StorageManager.getStorageManager(conf, testDir);
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testGetScannerAndAppender() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for(int i=0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i].put(new Datum[] {
- DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i)});
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
- fs.mkdirs(path.getParent());
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, path);
- appender.init();
- for(Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
-
- Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(meta, schema, path);
- scanner.init();
- int i=0;
- while(scanner.next() != null) {
- i++;
- }
- assertEquals(4,i);
- }
-
- @Test
- public void testGetSplit() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(1).build();
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplit");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test partitions
- List<Path> partitions = Lists.newArrayList();
- for (int i =0; i < testCount; i++){
- Path tmpFile = new Path(tablePath, String.valueOf(i));
- DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
- partitions.add(tmpFile);
- }
-
- assertTrue(fs.exists(tablePath));
- StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- List<FileFragment> splits = Lists.newArrayList();
- // Get FileFragments in partition batch
- splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
- assertEquals(testCount, splits.size());
- // -1 is unknown volumeId
- assertEquals(-1, splits.get(0).getDiskIds()[0]);
-
- splits.clear();
- splits.addAll(sm.getSplits("data", meta, schema,
- partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
- assertEquals(testCount / 2, splits.size());
- assertEquals(1, splits.get(0).getHosts().length);
- assertEquals(-1, splits.get(0).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown();
-
- File dir = new File(testDataPath);
- dir.delete();
- }
- }
-
- @Test
- public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test files
- for (int i = 0; i < testCount; i++) {
- Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
- DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
- }
- assertTrue(fs.exists(tablePath));
- StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- List<FileFragment> splits = Lists.newArrayList();
- splits.addAll(sm.getSplits("data", meta, schema, tablePath));
-
- assertEquals(testCount, splits.size());
- assertEquals(2, splits.get(0).getHosts().length);
- assertEquals(2, splits.get(0).getDiskIds().length);
- assertNotEquals(-1, splits.get(0).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown();
-
- File dir = new File(testDataPath);
- dir.delete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
deleted file mode 100644
index bd1a1f9..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ /dev/null
@@ -1,868 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.RCFile;
-import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestStorages {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestStorages";
-
- private static String TEST_PROJECTION_AVRO_SCHEMA =
- "{\n" +
- " \"type\": \"record\",\n" +
- " \"namespace\": \"org.apache.tajo\",\n" +
- " \"name\": \"testProjection\",\n" +
- " \"fields\": [\n" +
- " { \"name\": \"id\", \"type\": \"int\" },\n" +
- " { \"name\": \"age\", \"type\": \"long\" },\n" +
- " { \"name\": \"score\", \"type\": \"float\" }\n" +
- " ]\n" +
- "}\n";
-
- private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
- "{\n" +
- " \"type\": \"record\",\n" +
- " \"namespace\": \"org.apache.tajo\",\n" +
- " \"name\": \"testNullHandlingTypes\",\n" +
- " \"fields\": [\n" +
- " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
- " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
- " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
- " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
- " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
- " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
- " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col11\", \"type\": \"null\" },\n" +
- " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
- " ]\n" +
- "}\n";
-
- private StoreType storeType;
- private boolean splitable;
- private boolean statsable;
- private boolean seekable;
- private Path testDir;
- private FileSystem fs;
-
- public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
- this.storeType = type;
- this.splitable = splitable;
- this.statsable = statsable;
- this.seekable = seekable;
-
- conf = new TajoConf();
-
- if (storeType == StoreType.RCFILE) {
- conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
- }
-
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- //type, splitable, statsable, seekable
- {StoreType.CSV, true, true, true},
- {StoreType.RAW, false, true, true},
- {StoreType.RCFILE, true, true, false},
- {StoreType.PARQUET, false, false, false},
- {StoreType.SEQUENCEFILE, true, true, false},
- {StoreType.AVRO, false, false, false},
- {StoreType.TEXTFILE, true, true, false},
- {StoreType.JSON, true, true, false},
- });
- }
-
- @Test
- public void testSplitable() throws IOException {
- if (splitable) {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "Splitable.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(2);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- appender.addTuple(vTuple);
- }
- appender.close();
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- long randomNum = (long) (Math.random() * fileLen) + 1;
-
- FileFragment[] tablets = new FileFragment[2];
- tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
- tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
-
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
- assertTrue(scanner.isSplittable());
- scanner.init();
- int tupleCnt = 0;
- while (scanner.next() != null) {
- tupleCnt++;
- }
- scanner.close();
-
- scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
- assertTrue(scanner.isSplittable());
- scanner.init();
- while (scanner.next() != null) {
- tupleCnt++;
- }
- scanner.close();
-
- assertEquals(tupleNum, tupleCnt);
- }
- }
-
- @Test
- public void testRCFileSplitable() throws IOException {
- if (storeType == StoreType.RCFILE) {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "Splitable.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(2);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- appender.addTuple(vTuple);
- }
- appender.close();
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- long randomNum = 122; // header size
-
- FileFragment[] tablets = new FileFragment[2];
- tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
- tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
-
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
- assertTrue(scanner.isSplittable());
- scanner.init();
- int tupleCnt = 0;
- while (scanner.next() != null) {
- tupleCnt++;
- }
- scanner.close();
-
- scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
- assertTrue(scanner.isSplittable());
- scanner.init();
- while (scanner.next() != null) {
- tupleCnt++;
- }
- scanner.close();
-
- assertEquals(tupleNum, tupleCnt);
- }
- }
-
- @Test
- public void testProjection() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("score", Type.FLOAT4);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
- if (storeType == StoreType.AVRO) {
- meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
- TEST_PROJECTION_AVRO_SCHEMA);
- }
-
- Path tablePath = new Path(testDir, "testProjection.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(3);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(i + 2));
- vTuple.put(2, DatumFactory.createFloat4(i + 3));
- appender.addTuple(vTuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
-
- Schema target = new Schema();
- target.addColumn("age", Type.INT8);
- target.addColumn("score", Type.FLOAT4);
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment, target);
- scanner.init();
- int tupleCnt = 0;
- Tuple tuple;
- while ((tuple = scanner.next()) != null) {
- if (storeType == StoreType.RCFILE
- || storeType == StoreType.CSV
- || storeType == StoreType.PARQUET
- || storeType == StoreType.SEQUENCEFILE
- || storeType == StoreType.AVRO) {
- assertTrue(tuple.get(0) == null);
- }
- assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
- assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
- tupleCnt++;
- }
- scanner.close();
-
- assertEquals(tupleNum, tupleCnt);
- }
-
- @Test
- public void testVariousTypes() throws IOException {
- boolean handleProtobuf = storeType != StoreType.JSON;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- if (handleProtobuf) {
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
- }
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
- if (storeType == StoreType.AVRO) {
- String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
- meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
- }
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createChar("hyunsik"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get()
- });
- if (handleProtobuf) {
- tuple.put(11, factory.createDatum(queryid.getProto()));
- }
-
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple retrieved;
- while ((retrieved = scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- }
-
- @Test
- public void testNullHandlingTypes() throws IOException {
- boolean handleProtobuf = storeType != StoreType.JSON;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
-
- if (handleProtobuf) {
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
- }
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
- meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
- meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
- meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
- meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
- if (storeType == StoreType.AVRO) {
- meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
- TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
- }
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
- int columnNum = 11 + (handleProtobuf ? 1 : 0);
- Tuple seedTuple = new VTuple(columnNum);
- seedTuple.put(new Datum[]{
- DatumFactory.createBool(true), // 0
- DatumFactory.createChar("hyunsik"), // 2
- DatumFactory.createInt2((short) 17), // 3
- DatumFactory.createInt4(59), // 4
- DatumFactory.createInt8(23l), // 5
- DatumFactory.createFloat4(77.9f), // 6
- DatumFactory.createFloat8(271.9f), // 7
- DatumFactory.createText("hyunsik"), // 8
- DatumFactory.createBlob("hyunsik".getBytes()),// 9
- DatumFactory.createInet4("192.168.0.1"), // 10
- NullDatum.get(), // 11
- });
-
- if (handleProtobuf) {
- seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
- }
-
- // Making tuples with different null column positions
- Tuple tuple;
- for (int i = 0; i < columnNum; i++) {
- tuple = new VTuple(columnNum);
- for (int j = 0; j < columnNum; j++) {
- if (i == j) { // i'th column will have NULL value
- tuple.put(j, NullDatum.get());
- } else {
- tuple.put(j, seedTuple.get(j));
- }
- }
- appender.addTuple(tuple);
- }
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple retrieved;
- int i = 0;
- while ((retrieved = scanner.next()) != null) {
- assertEquals(columnNum, retrieved.size());
- for (int j = 0; j < columnNum; j++) {
- if (i == j) {
- assertEquals(NullDatum.get(), retrieved.get(j));
- } else {
- assertEquals(seedTuple.get(j), retrieved.get(j));
- }
- }
-
- i++;
- }
- scanner.close();
- }
-
- @Test
- public void testRCFileTextSerializeDeserialize() throws IOException {
- if(storeType != StoreType.RCFILE) return;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(13);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
-
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple retrieved;
- while ((retrieved=scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-
- @Test
- public void testRCFileBinarySerializeDeserialize() throws IOException {
- if(storeType != StoreType.RCFILE) return;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(13);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
-
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple retrieved;
- while ((retrieved=scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-
- @Test
- public void testSequenceFileTextSerializeDeserialize() throws IOException {
- if(storeType != StoreType.SEQUENCEFILE) return;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(13);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
-
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- assertTrue(scanner instanceof SequenceFileScanner);
- Writable key = ((SequenceFileScanner) scanner).getKey();
- assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
-
- Tuple retrieved;
- while ((retrieved=scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-
- @Test
- public void testSequenceFileBinarySerializeDeserialize() throws IOException {
- if(storeType != StoreType.SEQUENCEFILE) return;
-
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
-
- Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
- appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(13);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
-
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- assertTrue(scanner instanceof SequenceFileScanner);
- Writable key = ((SequenceFileScanner) scanner).getKey();
- assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
-
- Tuple retrieved;
- while ((retrieved=scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-
- @Test
- public void testTime() throws IOException {
- if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
- Schema schema = new Schema();
- schema.addColumn("col1", Type.DATE);
- schema.addColumn("col2", Type.TIME);
- schema.addColumn("col3", Type.TIMESTAMP);
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-
- Path tablePath = new Path(testDir, "testTime.data");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple = new VTuple(3);
- tuple.put(new Datum[]{
- DatumFactory.createDate("1980-04-01"),
- DatumFactory.createTime("12:34:56"),
- DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
- });
- appender.addTuple(tuple);
- appender.flush();
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple retrieved;
- while ((retrieved = scanner.next()) != null) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
- }
- }
- scanner.close();
- }
- }
-
- @Test
- public void testSeekableScanner() throws IOException {
- if (!seekable) {
- return;
- }
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("comment", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "Seekable.data");
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 100000;
- VTuple vTuple;
-
- List<Long> offsets = Lists.newArrayList();
- offsets.add(0L);
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(3);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- vTuple.put(2, DatumFactory.createText("test" + i));
- appender.addTuple(vTuple);
-
- // find a seek position
- if (i % (tupleNum / 3) == 0) {
- offsets.add(appender.getOffset());
- }
- }
-
- // end of file
- if (!offsets.contains(appender.getOffset())) {
- offsets.add(appender.getOffset());
- }
-
- appender.close();
- if (statsable) {
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
- }
-
- FileStatus status = fs.getFileStatus(tablePath);
- assertEquals(status.getLen(), appender.getOffset());
-
- Scanner scanner;
- int tupleCnt = 0;
- long prevOffset = 0;
- long readBytes = 0;
- long readRows = 0;
- for (long offset : offsets) {
- scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema,
- new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
- scanner.init();
-
- while (scanner.next() != null) {
- tupleCnt++;
- }
-
- scanner.close();
- if (statsable) {
- readBytes += scanner.getInputStats().getNumBytes();
- readRows += scanner.getInputStats().getNumRows();
- }
- prevOffset = offset;
- }
-
- assertEquals(tupleNum, tupleCnt);
- if (statsable) {
- assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
- assertEquals(appender.getStats().getNumRows().longValue(), readRows);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
deleted file mode 100644
index 639ca04..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestTupleComparator {
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testCompare() {
- Schema schema = new Schema();
- schema.addColumn("col1", Type.INT4);
- schema.addColumn("col2", Type.INT4);
- schema.addColumn("col3", Type.INT4);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.TEXT);
-
- Tuple tuple1 = new VTuple(5);
- Tuple tuple2 = new VTuple(5);
-
- tuple1.put(
- new Datum[] {
- DatumFactory.createInt4(9),
- DatumFactory.createInt4(3),
- DatumFactory.createInt4(33),
- DatumFactory.createInt4(4),
- DatumFactory.createText("abc")});
- tuple2.put(
- new Datum[] {
- DatumFactory.createInt4(1),
- DatumFactory.createInt4(25),
- DatumFactory.createInt4(109),
- DatumFactory.createInt4(4),
- DatumFactory.createText("abd")});
-
- SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
- SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
-
- BaseTupleComparator tc = new BaseTupleComparator(schema,
- new SortSpec[] {sortKey1, sortKey2});
- assertEquals(-1, tc.compare(tuple1, tuple2));
- assertEquals(1, tc.compare(tuple2, tuple1));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
deleted file mode 100644
index 9837fd1..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.tajo.datum.DatumFactory;
-
-import static org.junit.Assert.*;
-
-public class TestVTuple {
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
-
- }
-
- @Test
- public void testContain() {
- VTuple t1 = new VTuple(260);
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(1));
- t1.put(27, DatumFactory.createInt4(1));
- t1.put(96, DatumFactory.createInt4(1));
- t1.put(257, DatumFactory.createInt4(1));
-
- assertTrue(t1.contains(0));
- assertTrue(t1.contains(1));
- assertFalse(t1.contains(2));
- assertFalse(t1.contains(3));
- assertFalse(t1.contains(4));
- assertTrue(t1.contains(27));
- assertFalse(t1.contains(28));
- assertFalse(t1.contains(95));
- assertTrue(t1.contains(96));
- assertFalse(t1.contains(97));
- assertTrue(t1.contains(257));
- }
-
- @Test
- public void testPut() {
- VTuple t1 = new VTuple(260);
- t1.put(0, DatumFactory.createText("str"));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(257, DatumFactory.createFloat4(0.76f));
-
- assertTrue(t1.contains(0));
- assertTrue(t1.contains(1));
-
- assertEquals(t1.getText(0),"str");
- assertEquals(t1.get(1).asInt4(),2);
- assertTrue(t1.get(257).asFloat4() == 0.76f);
- }
-
- @Test
- public void testEquals() {
- Tuple t1 = new VTuple(5);
- Tuple t2 = new VTuple(5);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
-
- assertEquals(t1,t2);
-
- Tuple t3 = new VTuple(5);
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(4, DatumFactory.createInt4(2));
-
- assertNotSame(t1,t3);
- }
-
- @Test
- public void testHashCode() {
- Tuple t1 = new VTuple(5);
- Tuple t2 = new VTuple(5);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("hyunsik"));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
- t2.put(4, DatumFactory.createText("hyunsik"));
-
- assertEquals(t1.hashCode(),t2.hashCode());
-
- Tuple t3 = new VTuple(5);
- t3.put(0, DatumFactory.createInt4(1));
- t3.put(1, DatumFactory.createInt4(2));
- t3.put(4, DatumFactory.createInt4(2));
-
- assertNotSame(t1.hashCode(),t3.hashCode());
- }
-
- @Test
- public void testPutTuple() {
- Tuple t1 = new VTuple(5);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(2, DatumFactory.createInt4(3));
-
- Tuple t2 = new VTuple(2);
- t2.put(0, DatumFactory.createInt4(4));
- t2.put(1, DatumFactory.createInt4(5));
-
- t1.put(3, t2);
-
- for (int i = 0; i < 5; i++) {
- assertEquals(i+1, t1.get(i).asInt4());
- }
- }
-
- @Test
- public void testClone() throws CloneNotSupportedException {
- Tuple t1 = new VTuple(5);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("str"));
-
- VTuple t2 = (VTuple) t1.clone();
- assertNotSame(t1, t2);
- assertEquals(t1, t2);
-
- assertSame(t1.get(4), t2.get(4));
-
- t1.clear();
- assertFalse(t1.equals(t2));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
deleted file mode 100644
index a79e8ab..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import org.apache.avro.Schema;
-import org.apache.tajo.HttpFileServer;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.NetUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
- */
-public class TestAvroUtil {
- private Schema expected;
- private URL schemaUrl;
-
- @Before
- public void setUp() throws Exception {
- schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
- assertNotNull(schemaUrl);
-
- File file = new File(schemaUrl.getPath());
- assertTrue(file.exists());
-
- expected = new Schema.Parser().parse(file);
- }
-
- @Test
- public void testGetSchema() throws IOException, URISyntaxException {
- TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
- meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
- Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
- assertEquals(expected, schema);
-
- meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
- meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
- schema = AvroUtil.getAvroSchema(meta, new TajoConf());
- assertEquals(expected, schema);
-
- HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
- try {
- server.start();
- InetSocketAddress addr = server.getBindAddress();
-
- String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
- meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
- meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
- schema = AvroUtil.getAvroSchema(meta, new TajoConf());
- } finally {
- server.stop();
- }
- assertEquals(expected, schema);
- }
-
- @Test
- public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
- HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
- try {
- server.start();
- InetSocketAddress addr = server.getBindAddress();
-
- Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
- assertEquals(expected, schema);
- } finally {
- server.stop();
- }
- }
-
- @Test
- public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
- Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
-
- assertEquals(expected, schema);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
deleted file mode 100644
index 7900195..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBSTIndex {
- private TajoConf conf;
- private Schema schema;
- private TableMeta meta;
-
- private static final int TUPLE_NUM = 10000;
- private static final int LOAD_NUM = 100;
- private static final String TEST_PATH = "target/test-data/TestIndex";
- private Path testDir;
- private FileSystem fs;
- private StoreType storeType;
-
- public TestBSTIndex(StoreType type) {
- this.storeType = type;
- conf = new TajoConf();
- conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
- schema = new Schema();
- schema.addColumn(new Column("int", Type.INT4));
- schema.addColumn(new Column("long", Type.INT8));
- schema.addColumn(new Column("double", Type.FLOAT8));
- schema.addColumn(new Column("float", Type.FLOAT4));
- schema.addColumn(new Column("string", Type.TEXT));
- }
-
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][]{
- {StoreType.CSV},
- {StoreType.RAW}
- });
- }
-
- @Before
- public void setUp() throws Exception {
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Test
- public void testFindValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindValue_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX,
- keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
- assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testBuildIndexWithAppender() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.init();
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- Tuple tuple;
- long offset;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
-
- offset = appender.getOffset();
- appender.addTuple(tuple);
- creater.write(tuple, offset);
- }
- appender.flush();
- appender.close();
-
- creater.flush();
- creater.close();
-
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- tuple = new VTuple(keySchema.size());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
- assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindOmittedValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
- keyTuple.put(0, DatumFactory.createInt8(i));
- keyTuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(keyTuple);
- assertEquals(-1, offsets);
- }
- reader.close();
- }
-
- @Test
- public void testFindNextKeyValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]",
- (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindNextKeyOmittedValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
- }
- scanner.close();
- }
-
- @Test
- public void testFindMinValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = 5; i < TUPLE_NUM + 5; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- tuple.put(0, DatumFactory.createInt8(0));
- tuple.put(1, DatumFactory.createFloat8(0));
-
- offset = reader.find(tuple);
- assertEquals(-1, offset);
-
- offset = reader.find(tuple, true);
- assertTrue(offset >= 0);
- scanner.seek(offset);
- tuple = scanner.next();
- assertEquals(5, tuple.get(1).asInt4());
- assertEquals(5l, tuple.get(2).asInt8());
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testMinMax() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testMinMax_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 5; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- Tuple min = reader.getFirstKey();
- assertEquals(5, min.get(0).asInt4());
- assertEquals(5l, min.get(0).asInt8());
-
- Tuple max = reader.getLastKey();
- assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
- assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
- reader.close();
- }
-
- private class ConcurrentAccessor implements Runnable {
- final BSTIndexReader reader;
- final Random rnd = new Random(System.currentTimeMillis());
- boolean failed = false;
-
- ConcurrentAccessor(BSTIndexReader reader) {
- this.reader = reader;
- }
-
- public boolean isFailed() {
- return this.failed;
- }
-
- @Override
- public void run() {
- Tuple findKey = new VTuple(2);
- int keyVal;
- for (int i = 0; i < 10000; i++) {
- keyVal = rnd.nextInt(10000);
- findKey.put(0, DatumFactory.createInt4(keyVal));
- findKey.put(1, DatumFactory.createInt8(keyVal));
- try {
- assertTrue(reader.find(findKey) != -1);
- } catch (Exception e) {
- e.printStackTrace();
- this.failed = true;
- }
- }
- }
- }
-
- @Test
- public void testConcurrentAccess() throws IOException, InterruptedException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- Thread[] threads = new Thread[5];
- ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
- for (int i = 0; i < threads.length; i++) {
- accs[i] = new ConcurrentAccessor(reader);
- threads[i] = new Thread(accs[i]);
- threads[i].start();
- }
-
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- assertFalse(accs[i].isFailed());
- }
- reader.close();
- }
-
-
- @Test
- public void testFindValueDescOrder() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = (TUPLE_NUM - 1); i > 0; i--) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
- assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindNextKeyValueDescOrder() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
- "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- assertEquals(keySchema, reader.getKeySchema());
- assertEquals(comp, reader.getComparator());
-
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = (TUPLE_NUM - 1); i > 0; i--) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i - 1) + " ]",
- (i - 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
- }
- reader.close();
- scanner.close();
- }
-}