You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:17 UTC
[02/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
new file mode 100644
index 0000000..4081a80
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -0,0 +1,867 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestStorages";
+
+ private static String TEST_PROJECTION_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testProjection\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"id\", \"type\": \"int\" },\n" +
+ " { \"name\": \"age\", \"type\": \"long\" },\n" +
+ " { \"name\": \"score\", \"type\": \"float\" }\n" +
+ " ]\n" +
+ "}\n";
+
+ private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testNullHandlingTypes\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
+ " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
+ " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
+ " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
+ " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
+ " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
+ " { \"name\": \"col12\", \"type\": \"null\" },\n" +
+ " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
+ " ]\n" +
+ "}\n";
+
+ private StoreType storeType;
+ private boolean splitable;
+ private boolean statsable;
+ private boolean seekable;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
+ this.storeType = type;
+ this.splitable = splitable;
+ this.statsable = statsable;
+ this.seekable = seekable;
+
+ conf = new TajoConf();
+
+ if (storeType == StoreType.RCFILE) {
+ conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+ }
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ //type, splitable, statsable, seekable
+ {StoreType.CSV, true, true, true},
+ {StoreType.RAW, false, true, true},
+ {StoreType.RCFILE, true, true, false},
+ {StoreType.PARQUET, false, false, false},
+ {StoreType.SEQUENCEFILE, true, true, false},
+ {StoreType.AVRO, false, false, false},
+ {StoreType.TEXTFILE, true, true, false},
+ });
+ }
+
+ @Test
+ public void testSplitable() throws IOException {
+ if (splitable) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = (long) (Math.random() * fileLen) + 1;
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testRCFileSplitable() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = 122; // header size
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_PROJECTION_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testProjection.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(i + 2));
+ vTuple.put(2, DatumFactory.createFloat4(i + 3));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+ Schema target = new Schema();
+ target.addColumn("age", Type.INT8);
+ target.addColumn("score", Type.FLOAT4);
+ Scanner scanner = sm.getScanner(meta, schema, fragment, target);
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ if (storeType == StoreType.RCFILE
+ || storeType == StoreType.CSV
+ || storeType == StoreType.PARQUET
+ || storeType == StoreType.SEQUENCEFILE
+ || storeType == StoreType.AVRO) {
+ assertTrue(tuple.get(0) == null);
+ }
+ assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+ assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+
+ @Test
+ public void testVariousTypes() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
+ }
+
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("hyunsik"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = sm.getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testNullHandlingTypes() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+ meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple seedTuple = new VTuple(13);
+ seedTuple.put(new Datum[]{
+ DatumFactory.createBool(true), // 0
+ DatumFactory.createBit((byte) 0x99), // 1
+ DatumFactory.createChar("hyunsik"), // 2
+ DatumFactory.createInt2((short) 17), // 3
+ DatumFactory.createInt4(59), // 4
+ DatumFactory.createInt8(23l), // 5
+ DatumFactory.createFloat4(77.9f), // 6
+ DatumFactory.createFloat8(271.9f), // 7
+ DatumFactory.createText("hyunsik"), // 8
+ DatumFactory.createBlob("hyunsik".getBytes()),// 9
+ DatumFactory.createInet4("192.168.0.1"), // 10
+ NullDatum.get(), // 11
+ factory.createDatum(queryid.getProto()) // 12
+ });
+
+ // Making tuples with different null column positions
+ Tuple tuple;
+ for (int i = 0; i < 13; i++) {
+ tuple = new VTuple(13);
+ for (int j = 0; j < 13; j++) {
+ if (i == j) { // i'th column will have NULL value
+ tuple.put(j, NullDatum.get());
+ } else {
+ tuple.put(j, seedTuple.get(j));
+ }
+ }
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ int i = 0;
+ while ((retrieved = scanner.next()) != null) {
+ assertEquals(13, retrieved.size());
+ for (int j = 0; j < 13; j++) {
+ if (i == j) {
+ assertEquals(NullDatum.get(), retrieved.get(j));
+ } else {
+ assertEquals(seedTuple.get(j), retrieved.get(j));
+ }
+ }
+
+ i++;
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testRCFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testRCFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testTime() throws IOException {
+ if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.DATE);
+ schema.addColumn("col2", Type.TIME);
+ schema.addColumn("col3", Type.TIMESTAMP);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+ Path tablePath = new Path(testDir, "testTime.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple = new VTuple(3);
+ tuple.put(new Datum[]{
+ DatumFactory.createDate("1980-04-01"),
+ DatumFactory.createTime("12:34:56"),
+ DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+ }
+
+ @Test
+ public void testSeekableScanner() throws IOException {
+ if (!seekable) {
+ return;
+ }
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Seekable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ List<Long> offsets = Lists.newArrayList();
+ offsets.add(0L);
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("test" + i));
+ appender.addTuple(vTuple);
+
+ // find a seek position
+ if (i % (tupleNum / 3) == 0) {
+ offsets.add(appender.getOffset());
+ }
+ }
+
+ // end of file
+ if (!offsets.contains(appender.getOffset())) {
+ offsets.add(appender.getOffset());
+ }
+
+ appender.close();
+ if (statsable) {
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ }
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(status.getLen(), appender.getOffset());
+
+ Scanner scanner;
+ int tupleCnt = 0;
+ long prevOffset = 0;
+ long readBytes = 0;
+ long readRows = 0;
+ for (long offset : offsets) {
+ scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema,
+ new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
+ scanner.init();
+
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+
+ scanner.close();
+ if (statsable) {
+ readBytes += scanner.getInputStats().getNumBytes();
+ readRows += scanner.getInputStats().getNumRows();
+ }
+ prevOffset = offset;
+ }
+
+ assertEquals(tupleNum, tupleCnt);
+ if (statsable) {
+ assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
+ assertEquals(appender.getStats().getNumRows().longValue(), readRows);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
new file mode 100644
index 0000000..4f7ea1c
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.tajo.HttpFileServer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NetUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
+ */
+public class TestAvroUtil {
+ private Schema expected;
+ private URL schemaUrl;
+
+ @Before
+ public void setUp() throws Exception {
+ schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
+ assertNotNull(schemaUrl);
+
+ File file = new File(schemaUrl.getPath());
+ assertTrue(file.exists());
+
+ expected = new Schema.Parser().parse(file);
+ }
+
+ @Test
+ public void testGetSchema() throws IOException, URISyntaxException {
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
+ Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ } finally {
+ server.stop();
+ }
+ assertEquals(expected, schema);
+ }
+
+ @Test
+ public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
+ assertEquals(expected, schema);
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
+ Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
+
+ assertEquals(expected, schema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
new file mode 100644
index 0000000..383740d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -0,0 +1,947 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestBSTIndex {
+ private TajoConf conf;
+ private Schema schema;
+ private TableMeta meta;
+
+ private static final int TUPLE_NUM = 10000;
+ private static final int LOAD_NUM = 100;
+ private static final String TEST_PATH = "target/test-data/TestIndex";
+ private Path testDir;
+ private FileSystem fs;
+ private StoreType storeType;
+
+ public TestBSTIndex(StoreType type) {
+ this.storeType = type;
+ conf = new TajoConf();
+ conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
+ schema = new Schema();
+ schema.addColumn(new Column("int", Type.INT4));
+ schema.addColumn(new Column("long", Type.INT8));
+ schema.addColumn(new Column("double", Type.FLOAT8));
+ schema.addColumn(new Column("float", Type.FLOAT4));
+ schema.addColumn(new Column("string", Type.TEXT));
+ }
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {StoreType.CSV},
+ {StoreType.RAW}
+ });
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Test
+ public void testFindValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindValue_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX,
+ keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(1));
+ keyTuple.put(1, tuple.get(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ tuple = new VTuple(keySchema.size());
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
+ reader.open();
+ scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
+ tuple.put(0, DatumFactory.createInt8(i));
+ tuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(tuple);
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+ }
+ reader.close();
+ scanner.close();
+ }
+
+ @Test
+ public void testBuildIndexWithAppender() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
+ FileAppender appender = (FileAppender) ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(meta, schema, tablePath);
+ appender.init();
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ Tuple tuple;
+ long offset;
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+
+ offset = appender.getOffset();
+ appender.addTuple(tuple);
+ creater.write(tuple, offset);
+ }
+ appender.flush();
+ appender.close();
+
+ creater.flush();
+ creater.close();
+
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ tuple = new VTuple(keySchema.size());
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
+ tuple.put(0, DatumFactory.createInt8(i));
+ tuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(tuple);
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
+ assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+ }
+ reader.close();
+ scanner.close();
+ }
+
+ @Test
+ public void testFindOmittedValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i += 2) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(1));
+ keyTuple.put(1, tuple.get(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
+ keyTuple.put(0, DatumFactory.createInt8(i));
+ keyTuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(keyTuple);
+ assertEquals(-1, offsets);
+ }
+ reader.close();
+ }
+
+ @Test
+ public void testFindNextKeyValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple result;
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
+ keyTuple = new VTuple(2);
+ keyTuple.put(0, DatumFactory.createInt4(i));
+ keyTuple.put(1, DatumFactory.createInt8(i));
+ long offsets = reader.find(keyTuple, true);
+ scanner.seek(offsets);
+ result = scanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]",
+ (i + 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ result = scanner.next();
+ assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
+ assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
+ }
+ reader.close();
+ scanner.close();
+ }
+
+ @Test
+ public void testFindNextKeyOmittedValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i += 2) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple result;
+ for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
+ keyTuple = new VTuple(2);
+ keyTuple.put(0, DatumFactory.createInt4(i));
+ keyTuple.put(1, DatumFactory.createInt8(i));
+ long offsets = reader.find(keyTuple, true);
+ scanner.seek(offsets);
+ result = scanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testFindMinValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple;
+ for (int i = 5; i < TUPLE_NUM + 5; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(1));
+ keyTuple.put(1, tuple.get(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ tuple = new VTuple(keySchema.size());
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ tuple.put(0, DatumFactory.createInt8(0));
+ tuple.put(1, DatumFactory.createFloat8(0));
+
+ offset = reader.find(tuple);
+ assertEquals(-1, offset);
+
+ offset = reader.find(tuple, true);
+ assertTrue(offset >= 0);
+ scanner.seek(offset);
+ tuple = scanner.next();
+ assertEquals(5, tuple.get(1).asInt4());
+ assertEquals(5l, tuple.get(2).asInt8());
+ reader.close();
+ scanner.close();
+ }
+
+ @Test
+ public void testMinMax() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testMinMax_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 5; i < TUPLE_NUM; i += 2) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+
+ Tuple min = reader.getFirstKey();
+ assertEquals(5, min.get(0).asInt4());
+ assertEquals(5l, min.get(0).asInt8());
+
+ Tuple max = reader.getLastKey();
+ assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
+ assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
+ reader.close();
+ }
+
+ private class ConcurrentAccessor implements Runnable {
+ final BSTIndexReader reader;
+ final Random rnd = new Random(System.currentTimeMillis());
+ boolean failed = false;
+
+ ConcurrentAccessor(BSTIndexReader reader) {
+ this.reader = reader;
+ }
+
+ public boolean isFailed() {
+ return this.failed;
+ }
+
+ @Override
+ public void run() {
+ Tuple findKey = new VTuple(2);
+ int keyVal;
+ for (int i = 0; i < 10000; i++) {
+ keyVal = rnd.nextInt(10000);
+ findKey.put(0, DatumFactory.createInt4(keyVal));
+ findKey.put(1, DatumFactory.createInt8(keyVal));
+ try {
+ assertTrue(reader.find(findKey) != -1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.failed = true;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentAccess() throws IOException, InterruptedException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+
+ Thread[] threads = new Thread[5];
+ ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
+ for (int i = 0; i < threads.length; i++) {
+ accs[i] = new ConcurrentAccessor(reader);
+ threads[i] = new Thread(accs[i]);
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ assertFalse(accs[i].isFailed());
+ }
+ reader.close();
+ }
+
+
+ @Test
+ public void testFindValueDescOrder() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple;
+ for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(1));
+ keyTuple.put(1, tuple.get(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ tuple = new VTuple(keySchema.size());
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+ scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ for (int i = (TUPLE_NUM - 1); i > 0; i--) {
+ tuple.put(0, DatumFactory.createInt8(i));
+ tuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(tuple);
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
+ }
+ reader.close();
+ scanner.close();
+ }
+
+ @Test
+ public void testFindNextKeyValueDescOrder() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple;
+ for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+ "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
+ keySchema, comp);
+ reader.open();
+
+ assertEquals(keySchema, reader.getKeySchema());
+ assertEquals(comp, reader.getComparator());
+
+ scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ Tuple result;
+ for (int i = (TUPLE_NUM - 1); i > 0; i--) {
+ keyTuple = new VTuple(2);
+ keyTuple.put(0, DatumFactory.createInt4(i));
+ keyTuple.put(1, DatumFactory.createInt8(i));
+ long offsets = reader.find(keyTuple, true);
+ scanner.seek(offsets);
+ result = scanner.next();
+ assertTrue("[seek check " + (i - 1) + " ]",
+ (i - 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ result = scanner.next();
+ assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
+ assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
+ }
+ reader.close();
+ scanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
new file mode 100644
index 0000000..d7c9f49
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.storage.CSVFile.CSVScanner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSingleCSVFileBSTIndex {
+
+ private TajoConf conf;
+ private Schema schema;
+ private TableMeta meta;
+ private FileSystem fs;
+
+ private static final int TUPLE_NUM = 10000;
+ private static final int LOAD_NUM = 100;
+ private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex";
+ private Path testDir;
+
+ public TestSingleCSVFileBSTIndex() {
+ conf = new TajoConf();
+ conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+ schema = new Schema();
+ schema.addColumn(new Column("int", Type.INT4));
+ schema.addColumn(new Column("long", Type.INT8));
+ schema.addColumn(new Column("double", Type.FLOAT8));
+ schema.addColumn(new Column("float", Type.FLOAT4));
+ schema.addColumn(new Column("string", Type.TEXT));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Test
+ public void testFindValueInSingleCSV() throws IOException {
+ meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
+ fs.mkdirs(tablePath.getParent());
+
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+ "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
+ fileScanner.init();
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = fileScanner.getNextOffset();
+ tuple = fileScanner.next();
+ if (tuple == null)
+ break;
+
+ keyTuple.put(0, tuple.get(1));
+ keyTuple.put(1, tuple.get(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ fileScanner.close();
+
+ tuple = new VTuple(keySchema.size());
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
+ "FindValueInCSV.idx"), keySchema, comp);
+ reader.open();
+ fileScanner = new CSVScanner(conf, schema, meta, tablet);
+ fileScanner.init();
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
+ tuple.put(0, DatumFactory.createInt8(i));
+ tuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(tuple);
+ fileScanner.seek(offsets);
+ tuple = fileScanner.next();
+ assertEquals(i, (tuple.get(1).asInt8()));
+ assertEquals(i, (tuple.get(2).asFloat8()) , 0.01);
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ fileScanner.seek(offsets);
+ tuple = fileScanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]",
+ (i + 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]",
+ (i + 1) == (tuple.get(1).asInt8()));
+ }
+ }
+
+ @Test
+ public void testFindNextKeyValueInSingleCSV() throws IOException {
+ meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
+ "table1.csv");
+ fs.mkdirs(tablePath.getParent());
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+ for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec [] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ creater.setLoadNum(LOAD_NUM);
+ creater.open();
+
+ SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
+ fileScanner.init();
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = fileScanner.getNextOffset();
+ tuple = fileScanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.get(0));
+ keyTuple.put(1, tuple.get(1));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ fileScanner.close();
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
+ reader.open();
+ fileScanner = new CSVScanner(conf, schema, meta, tablet);
+ fileScanner.init();
+ Tuple result;
+ for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+ keyTuple = new VTuple(2);
+ keyTuple.put(0, DatumFactory.createInt4(i));
+ keyTuple.put(1, DatumFactory.createInt8(i));
+ long offsets = reader.find(keyTuple, true);
+ fileScanner.seek(offsets);
+ result = fileScanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ fileScanner.seek(offsets);
+ result = fileScanner.next();
+ assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8()));
+ assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
new file mode 100644
index 0000000..109fed9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import com.google.common.base.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestReadWrite {
+ private static final String HELLO = "hello";
+
+ private Path createTmpFile() throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+
+ // it prevents accessing HDFS namenode of TajoTestingCluster.
+ LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration());
+ return localFS.makeQualified(new Path(tmp.getPath()));
+ }
+
+ private Schema createAllTypesSchema() {
+ List<Column> columns = new ArrayList<Column>();
+ columns.add(new Column("myboolean", Type.BOOLEAN));
+ columns.add(new Column("mybit", Type.BIT));
+ columns.add(new Column("mychar", Type.CHAR));
+ columns.add(new Column("myint2", Type.INT2));
+ columns.add(new Column("myint4", Type.INT4));
+ columns.add(new Column("myint8", Type.INT8));
+ columns.add(new Column("myfloat4", Type.FLOAT4));
+ columns.add(new Column("myfloat8", Type.FLOAT8));
+ columns.add(new Column("mytext", Type.TEXT));
+ columns.add(new Column("myblob", Type.BLOB));
+ columns.add(new Column("mynull", Type.NULL_TYPE));
+ Column[] columnsArray = new Column[columns.size()];
+ columnsArray = columns.toArray(columnsArray);
+ return new Schema(columnsArray);
+ }
+
+ @Test
+ public void testAll() throws Exception {
+ Path file = createTmpFile();
+ Schema schema = createAllTypesSchema();
+ Tuple tuple = new VTuple(schema.size());
+ tuple.put(0, DatumFactory.createBool(true));
+ tuple.put(1, DatumFactory.createBit((byte)128));
+ tuple.put(2, DatumFactory.createChar('t'));
+ tuple.put(3, DatumFactory.createInt2((short)2048));
+ tuple.put(4, DatumFactory.createInt4(4096));
+ tuple.put(5, DatumFactory.createInt8(8192L));
+ tuple.put(6, DatumFactory.createFloat4(0.2f));
+ tuple.put(7, DatumFactory.createFloat8(4.1));
+ tuple.put(8, DatumFactory.createText(HELLO));
+ tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8)));
+ tuple.put(10, NullDatum.get());
+
+ TajoParquetWriter writer = new TajoParquetWriter(file, schema);
+ writer.write(tuple);
+ writer.close();
+
+ TajoParquetReader reader = new TajoParquetReader(file, schema);
+ tuple = reader.read();
+
+ assertNotNull(tuple);
+ assertEquals(true, tuple.getBool(0));
+ assertEquals((byte)128, tuple.getByte(1));
+ assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2))));
+ assertEquals((short)2048, tuple.getInt2(3));
+ assertEquals(4096, tuple.getInt4(4));
+ assertEquals(8192L, tuple.getInt8(5));
+ assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6)));
+ assertEquals(new Double(4.1), new Double(tuple.getFloat8(7)));
+ assertTrue(HELLO.equals(tuple.getText(8)));
+ assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9));
+ assertEquals(NullDatum.get(), tuple.get(10));
+ }
+}