You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:23 UTC
[09/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
new file mode 100644
index 0000000..07e8dd7
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.storage.text.DelimitedLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+public class TestLineReader {
+ private static String TEST_PATH = "target/test-data/TestLineReader";
+
+ @Test
+ public void testByteBufLineReader() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ FileSystem fs = testDir.getFileSystem(conf);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+ schema.addColumn("comment2", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+ Path tablePath = new Path(testDir, "line.data");
+ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ null, null, meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+ vTuple.put(3, NullDatum.get());
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
+ ByteBufLineReader reader = new ByteBufLineReader(channel);
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ totalRead += bytes.get();
+ if(buf == null) break;
+ i++;
+ }
+ IOUtils.cleanup(null, reader, channel, fs);
+ assertEquals(tupleNum, i);
+ assertEquals(status.getLen(), totalRead);
+ assertEquals(status.getLen(), reader.readBytes());
+ }
+
+ @Test
+ public void testLineDelimitedReader() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ FileSystem fs = testDir.getFileSystem(conf);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+ schema.addColumn("comment2", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+ meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
+
+ Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
+ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ null, null, meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ long splitOffset = 0;
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+ vTuple.put(3, NullDatum.get());
+ appender.addTuple(vTuple);
+
+ if(i == (tupleNum / 2)){
+ splitOffset = appender.getOffset();
+ }
+ }
+ String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
+ appender.close();
+
+ tablePath = tablePath.suffix(extension);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
+ DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
+ assertTrue(reader.isCompressed());
+ assertFalse(reader.isReadable());
+ reader.init();
+ assertTrue(reader.isReadable());
+
+
+ int i = 0;
+ while(reader.isReadable()){
+ ByteBuf buf = reader.readLine();
+ if(buf == null) break;
+ i++;
+ }
+
+ IOUtils.cleanup(null, reader, fs);
+ assertEquals(tupleNum, i);
+
+ }
+
+ @Test
+ public void testByteBufLineReaderWithoutTerminating() throws IOException {
+ String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
+ File file = new File(path);
+ String data = FileUtil.readTextFile(file);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
+ ByteBufLineReader reader = new ByteBufLineReader(channel);
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ totalRead += bytes.get();
+ if(buf == null) break;
+ i++;
+ }
+ IOUtils.cleanup(null, reader);
+ assertEquals(file.length(), totalRead);
+ assertEquals(file.length(), reader.readBytes());
+ assertEquals(data.split("\n").length, i);
+ }
+
+ @Test
+ public void testCRLFLine() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
+
+ FileSystem fs = testFile.getFileSystem(conf);
+ FSDataOutputStream outputStream = fs.create(testFile, true);
+ outputStream.write("0\r\n1\r\n".getBytes());
+ outputStream.flush();
+ IOUtils.closeStream(outputStream);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
+ ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
+ FileStatus status = fs.getFileStatus(testFile);
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ totalRead += bytes.get();
+ if(buf == null) break;
+ String row = buf.toString(Charset.defaultCharset());
+ assertEquals(i, Integer.parseInt(row));
+ i++;
+ }
+ IOUtils.cleanup(null, reader);
+ assertEquals(status.getLen(), totalRead);
+ assertEquals(status.getLen(), reader.readBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
new file mode 100644
index 0000000..a0daa7d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestMergeScanner {
+ private TajoConf conf;
+ StorageManager sm;
+ private static String TEST_PATH = "target/test-data/TestMergeScanner";
+
+ private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testMultipleFiles\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"id\", \"type\": \"int\" },\n" +
+ " { \"name\": \"file\", \"type\": \"string\" },\n" +
+ " { \"name\": \"name\", \"type\": \"string\" },\n" +
+ " { \"name\": \"age\", \"type\": \"long\" }\n" +
+ " ]\n" +
+ "}\n";
+
+ private Path testDir;
+ private StoreType storeType;
+ private FileSystem fs;
+
+ public TestMergeScanner(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {StoreType.CSV},
+ {StoreType.RAW},
+ {StoreType.RCFILE},
+ {StoreType.PARQUET},
+ {StoreType.SEQUENCEFILE},
+ {StoreType.AVRO},
+ // RowFile requires Byte-buffer read support, so we omitted RowFile.
+ //{StoreType.ROWFILE},
+ });
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new TajoConf();
+ conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+ conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
+ }
+
+ @Test
+ public void testMultipleFiles() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("file", Type.TEXT);
+ schema.addColumn("name", Type.TEXT);
+ schema.addColumn("age", Type.INT8);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_MULTIPLE_FILES_AVRO_SCHEMA);
+ }
+
+ Path table1Path = new Path(testDir, storeType + "_1.data");
+ Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+ appender1.enableStats();
+ appender1.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender1.addTuple(vTuple);
+ }
+ appender1.close();
+
+ TableStats stat1 = appender1.getStats();
+ if (stat1 != null) {
+ assertEquals(tupleNum, stat1.getNumRows().longValue());
+ }
+
+ Path table2Path = new Path(testDir, storeType + "_2.data");
+ Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+ appender2.enableStats();
+ appender2.init();
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender2.addTuple(vTuple);
+ }
+ appender2.close();
+
+ TableStats stat2 = appender2.getStats();
+ if (stat2 != null) {
+ assertEquals(tupleNum, stat2.getNumRows().longValue());
+ }
+
+
+ FileStatus status1 = fs.getFileStatus(table1Path);
+ FileStatus status2 = fs.getFileStatus(table2Path);
+ Fragment[] fragment = new Fragment[2];
+ fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
+ fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
+
+ Schema targetSchema = new Schema();
+ targetSchema.addColumn(schema.getColumn(0));
+ targetSchema.addColumn(schema.getColumn(2));
+
+ Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
+ assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
+
+ scanner.init();
+ int totalCounts = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ totalCounts++;
+ if (isProjectableStorage(meta.getStoreType())) {
+ assertNotNull(tuple.get(0));
+ assertNull(tuple.get(1));
+ assertNotNull(tuple.get(2));
+ assertNull(tuple.get(3));
+ }
+ }
+ scanner.close();
+
+ assertEquals(tupleNum * 2, totalCounts);
+ }
+
+ private static boolean isProjectableStorage(StoreType type) {
+ switch (type) {
+ case RCFILE:
+ case PARQUET:
+ case SEQUENCEFILE:
+ case CSV:
+ case AVRO:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
new file mode 100644
index 0000000..12ea551
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
+import org.apache.tajo.storage.text.FieldSplitProcessor;
+import org.apache.tajo.storage.text.LineSplitProcessor;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static io.netty.util.ReferenceCountUtil.releaseLater;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSplitProcessor {
+
+ @Test
+ public void testFieldSplitProcessor() throws IOException {
+ String data = "abc||de";
+ final ByteBuf buf = releaseLater(
+ Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+ final int len = buf.readableBytes();
+ FieldSplitProcessor processor = new FieldSplitProcessor('|');
+
+ assertEquals(3, buf.forEachByte(0, len, processor));
+ assertEquals(4, buf.forEachByte(4, len - 4, processor));
+ assertEquals(-1, buf.forEachByte(5, len - 5, processor));
+
+ }
+
+ @Test
+ public void testLineSplitProcessor() throws IOException {
+ String data = "abc\r\n\n";
+ final ByteBuf buf = releaseLater(
+ Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+ final int len = buf.readableBytes();
+ LineSplitProcessor processor = new LineSplitProcessor();
+
+ //find CR
+ assertEquals(3, buf.forEachByte(0, len, processor));
+
+ // find CRLF
+ assertEquals(4, buf.forEachByte(4, len - 4, processor));
+ assertEquals(buf.getByte(4), '\n');
+ // need to skip LF
+ assertTrue(processor.isPrevCharCR());
+
+ // find LF
+ assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
new file mode 100644
index 0000000..15998f2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -0,0 +1,878 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestStorages";
+
+ private static String TEST_PROJECTION_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testProjection\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"id\", \"type\": \"int\" },\n" +
+ " { \"name\": \"age\", \"type\": \"long\" },\n" +
+ " { \"name\": \"score\", \"type\": \"float\" }\n" +
+ " ]\n" +
+ "}\n";
+
+ private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testNullHandlingTypes\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
+ " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
+ " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
+ " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
+ " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
+ " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
+ " { \"name\": \"col11\", \"type\": \"null\" },\n" +
+ " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
+ " ]\n" +
+ "}\n";
+
+ private StoreType storeType;
+ private boolean splitable;
+ private boolean statsable;
+ private boolean seekable;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
+ this.storeType = type;
+ this.splitable = splitable;
+ this.statsable = statsable;
+ this.seekable = seekable;
+
+ conf = new TajoConf();
+
+ if (storeType == StoreType.RCFILE) {
+ conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+ }
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ //type, splitable, statsable, seekable
+ {StoreType.CSV, true, true, true},
+ {StoreType.RAW, false, true, true},
+ {StoreType.RCFILE, true, true, false},
+ {StoreType.PARQUET, false, false, false},
+ {StoreType.SEQUENCEFILE, true, true, false},
+ {StoreType.AVRO, false, false, false},
+ {StoreType.TEXTFILE, true, true, false},
+ {StoreType.JSON, true, true, false},
+ });
+ }
+
+ @Test
+ public void testSplitable() throws IOException {
+ if (splitable) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = (long) (Math.random() * fileLen) + 1;
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testRCFileSplitable() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = 122; // header size
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_PROJECTION_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testProjection.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(i + 2));
+ vTuple.put(2, DatumFactory.createFloat4(i + 3));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+ Schema target = new Schema();
+ target.addColumn("age", Type.INT8);
+ target.addColumn("score", Type.FLOAT4);
+ Scanner scanner = sm.getScanner(meta, schema, fragment, target);
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ if (storeType == StoreType.RCFILE
+ || storeType == StoreType.CSV
+ || storeType == StoreType.PARQUET
+ || storeType == StoreType.SEQUENCEFILE
+ || storeType == StoreType.AVRO) {
+ assertTrue(tuple.get(0) == null);
+ }
+ assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+ assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+
+ @Test
+ public void testVariousTypes() throws IOException {
+ boolean handleProtobuf = storeType != StoreType.JSON;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+ if (handleProtobuf) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
+ }
+
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createChar("hyunsik"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get()
+ });
+ if (handleProtobuf) {
+ tuple.put(11, factory.createDatum(queryid.getProto()));
+ }
+
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = sm.getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testNullHandlingTypes() throws IOException {
+ boolean handleProtobuf = storeType != StoreType.JSON;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+
+ if (handleProtobuf) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+ meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+ int columnNum = 11 + (handleProtobuf ? 1 : 0);
+ Tuple seedTuple = new VTuple(columnNum);
+ seedTuple.put(new Datum[]{
+ DatumFactory.createBool(true), // 0
+ DatumFactory.createChar("hyunsik"), // 2
+ DatumFactory.createInt2((short) 17), // 3
+ DatumFactory.createInt4(59), // 4
+ DatumFactory.createInt8(23l), // 5
+ DatumFactory.createFloat4(77.9f), // 6
+ DatumFactory.createFloat8(271.9f), // 7
+ DatumFactory.createText("hyunsik"), // 8
+ DatumFactory.createBlob("hyunsik".getBytes()),// 9
+ DatumFactory.createInet4("192.168.0.1"), // 10
+ NullDatum.get(), // 11
+ });
+
+ if (handleProtobuf) {
+ seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
+ }
+
+ // Making tuples with different null column positions
+ Tuple tuple;
+ for (int i = 0; i < columnNum; i++) {
+ tuple = new VTuple(columnNum);
+ for (int j = 0; j < columnNum; j++) {
+ if (i == j) { // i'th column will have NULL value
+ tuple.put(j, NullDatum.get());
+ } else {
+ tuple.put(j, seedTuple.get(j));
+ }
+ }
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ int i = 0;
+ while ((retrieved = scanner.next()) != null) {
+ assertEquals(columnNum, retrieved.size());
+ for (int j = 0; j < columnNum; j++) {
+ if (i == j) {
+ assertEquals(NullDatum.get(), retrieved.get(j));
+ } else {
+ assertEquals(seedTuple.get(j), retrieved.get(j));
+ }
+ }
+
+ i++;
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testRCFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testRCFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testTime() throws IOException {
+ if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.DATE);
+ schema.addColumn("col2", Type.TIME);
+ schema.addColumn("col3", Type.TIMESTAMP);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+ Path tablePath = new Path(testDir, "testTime.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple = new VTuple(3);
+ tuple.put(new Datum[]{
+ DatumFactory.createDate("1980-04-01"),
+ DatumFactory.createTime("12:34:56"),
+ DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+ }
+
+ @Test
+ public void testSeekableScanner() throws IOException {
+ if (!seekable) {
+ return;
+ }
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Seekable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ List<Long> offsets = Lists.newArrayList();
+ offsets.add(0L);
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("test" + i));
+ appender.addTuple(vTuple);
+
+ // find a seek position
+ if (i % (tupleNum / 3) == 0) {
+ offsets.add(appender.getOffset());
+ }
+ }
+
+ // end of file
+ if (!offsets.contains(appender.getOffset())) {
+ offsets.add(appender.getOffset());
+ }
+
+ appender.close();
+ if (statsable) {
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ }
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(status.getLen(), appender.getOffset());
+
+ Scanner scanner;
+ int tupleCnt = 0;
+ long prevOffset = 0;
+ long readBytes = 0;
+ long readRows = 0;
+ for (long offset : offsets) {
+ scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema,
+ new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
+ scanner.init();
+
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+
+ scanner.close();
+ if (statsable) {
+ readBytes += scanner.getInputStats().getNumBytes();
+ readRows += scanner.getInputStats().getNumRows();
+ }
+ prevOffset = offset;
+ }
+
+ assertEquals(tupleNum, tupleCnt);
+ if (statsable) {
+ assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
+ assertEquals(appender.getStats().getNumRows().longValue(), readRows);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
new file mode 100644
index 0000000..7b83894
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.tajo.HttpFileServer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NetUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
+ */
+public class TestAvroUtil {
+ private Schema expected;
+ private URL schemaUrl;
+
+ @Before
+ public void setUp() throws Exception {
+ schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
+ assertNotNull(schemaUrl);
+
+ File file = new File(schemaUrl.getPath());
+ assertTrue(file.exists());
+
+ expected = new Schema.Parser().parse(file);
+ }
+
+ @Test
+ public void testGetSchema() throws IOException, URISyntaxException {
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
+ Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ } finally {
+ server.stop();
+ }
+ assertEquals(expected, schema);
+ }
+
+ @Test
+ public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
+ assertEquals(expected, schema);
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
+ Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
+
+ assertEquals(expected, schema);
+ }
+}