You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:31 UTC
[16/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
deleted file mode 100644
index d2cfd82..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBSTIndex {
- private TajoConf conf;
- private Schema schema;
- private TableMeta meta;
-
- private static final int TUPLE_NUM = 10000;
- private static final int LOAD_NUM = 100;
- private static final String TEST_PATH = "target/test-data/TestIndex";
- private Path testDir;
- private FileSystem fs;
- private StoreType storeType;
-
- public TestBSTIndex(StoreType type) {
- this.storeType = type;
- conf = new TajoConf();
- conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
- schema = new Schema();
- schema.addColumn(new Column("int", Type.INT4));
- schema.addColumn(new Column("long", Type.INT8));
- schema.addColumn(new Column("double", Type.FLOAT8));
- schema.addColumn(new Column("float", Type.FLOAT4));
- schema.addColumn(new Column("string", Type.TEXT));
- }
-
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][]{
- {StoreType.CSV},
- {StoreType.RAW}
- });
- }
-
- @Before
- public void setUp() throws Exception {
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Test
- public void testFindValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindValue_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX,
- keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
- assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testBuildIndexWithAppender() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.init();
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- Tuple tuple;
- long offset;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
-
- offset = appender.getOffset();
- appender.addTuple(tuple);
- creater.write(tuple, offset);
- }
- appender.flush();
- appender.close();
-
- creater.flush();
- creater.close();
-
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- tuple = new VTuple(keySchema.size());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
- assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindOmittedValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
- keyTuple.put(0, DatumFactory.createInt8(i));
- keyTuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(keyTuple);
- assertEquals(-1, offsets);
- }
- reader.close();
- }
-
- @Test
- public void testFindNextKeyValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]",
- (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindNextKeyOmittedValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
- }
- scanner.close();
- }
-
- @Test
- public void testFindMinValue() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = 5; i < TUPLE_NUM + 5; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- tuple.put(0, DatumFactory.createInt8(0));
- tuple.put(1, DatumFactory.createFloat8(0));
-
- offset = reader.find(tuple);
- assertEquals(-1, offset);
-
- offset = reader.find(tuple, true);
- assertTrue(offset >= 0);
- scanner.seek(offset);
- tuple = scanner.next();
- assertEquals(5, tuple.get(1).asInt4());
- assertEquals(5l, tuple.get(2).asInt8());
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testMinMax() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testMinMax_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 5; i < TUPLE_NUM; i += 2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- Tuple min = reader.getFirstKey();
- assertEquals(5, min.get(0).asInt4());
- assertEquals(5l, min.get(0).asInt8());
-
- Tuple max = reader.getLastKey();
- assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
- assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
- reader.close();
- }
-
- private class ConcurrentAccessor implements Runnable {
- final BSTIndexReader reader;
- final Random rnd = new Random(System.currentTimeMillis());
- boolean failed = false;
-
- ConcurrentAccessor(BSTIndexReader reader) {
- this.reader = reader;
- }
-
- public boolean isFailed() {
- return this.failed;
- }
-
- @Override
- public void run() {
- Tuple findKey = new VTuple(2);
- int keyVal;
- for (int i = 0; i < 10000; i++) {
- keyVal = rnd.nextInt(10000);
- findKey.put(0, DatumFactory.createInt4(keyVal));
- findKey.put(1, DatumFactory.createInt8(keyVal));
- try {
- assertTrue(reader.find(findKey) != -1);
- } catch (Exception e) {
- e.printStackTrace();
- this.failed = true;
- }
- }
- }
- }
-
- @Test
- public void testConcurrentAccess() throws IOException, InterruptedException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- Thread[] threads = new Thread[5];
- ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
- for (int i = 0; i < threads.length; i++) {
- accs[i] = new ConcurrentAccessor(reader);
- threads[i] = new Thread(accs[i]);
- threads[i].start();
- }
-
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- assertFalse(accs[i].isFailed());
- }
- reader.close();
- }
-
-
- @Test
- public void testFindValueDescOrder() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.size());
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- for (int i = (TUPLE_NUM - 1); i > 0; i--) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
- assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
- }
- reader.close();
- scanner.close();
- }
-
- @Test
- public void testFindNextKeyValueDescOrder() throws IOException {
- meta = CatalogUtil.newTableMeta(storeType);
-
- Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
-
- Tuple tuple;
- for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
- "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
- keySchema, comp);
- reader.open();
-
- assertEquals(keySchema, reader.getKeySchema());
- assertEquals(comp, reader.getComparator());
-
- scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for (int i = (TUPLE_NUM - 1); i > 0; i--) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i - 1) + " ]",
- (i - 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
- }
- reader.close();
- scanner.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
deleted file mode 100644
index 1081ae9..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.apache.tajo.storage.CSVFile.CSVScanner;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestSingleCSVFileBSTIndex {
-
- private TajoConf conf;
- private Schema schema;
- private TableMeta meta;
- private FileSystem fs;
-
- private static final int TUPLE_NUM = 10000;
- private static final int LOAD_NUM = 100;
- private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex";
- private Path testDir;
-
- public TestSingleCSVFileBSTIndex() {
- conf = new TajoConf();
- conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
- schema = new Schema();
- schema.addColumn(new Column("int", Type.INT4));
- schema.addColumn(new Column("long", Type.INT8));
- schema.addColumn(new Column("double", Type.FLOAT8));
- schema.addColumn(new Column("float", Type.FLOAT4));
- schema.addColumn(new Column("string", Type.TEXT));
- }
-
- @Before
- public void setUp() throws Exception {
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Test
- public void testFindValueInSingleCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
- fs.mkdirs(tablePath.getParent());
-
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for (int i = 0; i < TUPLE_NUM; i++) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec[] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", Type.INT8));
- keySchema.addColumn(new Column("double", Type.FLOAT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
- "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
- fileScanner.init();
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = fileScanner.getNextOffset();
- tuple = fileScanner.next();
- if (tuple == null)
- break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- fileScanner.close();
-
- tuple = new VTuple(keySchema.size());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
- "FindValueInCSV.idx"), keySchema, comp);
- reader.open();
- fileScanner = new CSVScanner(conf, schema, meta, tablet);
- fileScanner.init();
- for (int i = 0; i < TUPLE_NUM - 1; i++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple);
- fileScanner.seek(offsets);
- tuple = fileScanner.next();
- assertEquals(i, (tuple.get(1).asInt8()));
- assertEquals(i, (tuple.get(2).asFloat8()) , 0.01);
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- fileScanner.seek(offsets);
- tuple = fileScanner.next();
- assertTrue("[seek check " + (i + 1) + " ]",
- (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]",
- (i + 1) == (tuple.get(1).asInt8()));
- }
- }
-
- @Test
- public void testFindNextKeyValueInSingleCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
- "table1.csv");
- fs.mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", Type.INT4));
- keySchema.addColumn(new Column("long", Type.INT8));
-
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
- fileScanner.init();
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = fileScanner.getNextOffset();
- tuple = fileScanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- fileScanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
- reader.open();
- fileScanner = new CSVScanner(conf, schema, meta, tablet);
- fileScanner.init();
- Tuple result;
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- fileScanner.seek(offsets);
- result = fileScanner.next();
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- fileScanner.seek(offsets);
- result = fileScanner.next();
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
deleted file mode 100644
index 0a01dc4..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import com.google.common.base.Charsets;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
-public class TestReadWrite {
- private static final String HELLO = "hello";
-
- private Path createTmpFile() throws IOException {
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
-
- // it prevents accessing HDFS namenode of TajoTestingCluster.
- LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration());
- return localFS.makeQualified(new Path(tmp.getPath()));
- }
-
- private Schema createAllTypesSchema() {
- List<Column> columns = new ArrayList<Column>();
- columns.add(new Column("myboolean", Type.BOOLEAN));
- columns.add(new Column("mybit", Type.BIT));
- columns.add(new Column("mychar", Type.CHAR));
- columns.add(new Column("myint2", Type.INT2));
- columns.add(new Column("myint4", Type.INT4));
- columns.add(new Column("myint8", Type.INT8));
- columns.add(new Column("myfloat4", Type.FLOAT4));
- columns.add(new Column("myfloat8", Type.FLOAT8));
- columns.add(new Column("mytext", Type.TEXT));
- columns.add(new Column("myblob", Type.BLOB));
- columns.add(new Column("mynull", Type.NULL_TYPE));
- Column[] columnsArray = new Column[columns.size()];
- columnsArray = columns.toArray(columnsArray);
- return new Schema(columnsArray);
- }
-
- @Test
- public void testAll() throws Exception {
- Path file = createTmpFile();
- Schema schema = createAllTypesSchema();
- Tuple tuple = new VTuple(schema.size());
- tuple.put(0, DatumFactory.createBool(true));
- tuple.put(1, DatumFactory.createBit((byte)128));
- tuple.put(2, DatumFactory.createChar('t'));
- tuple.put(3, DatumFactory.createInt2((short)2048));
- tuple.put(4, DatumFactory.createInt4(4096));
- tuple.put(5, DatumFactory.createInt8(8192L));
- tuple.put(6, DatumFactory.createFloat4(0.2f));
- tuple.put(7, DatumFactory.createFloat8(4.1));
- tuple.put(8, DatumFactory.createText(HELLO));
- tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8)));
- tuple.put(10, NullDatum.get());
-
- TajoParquetWriter writer = new TajoParquetWriter(file, schema);
- writer.write(tuple);
- writer.close();
-
- TajoParquetReader reader = new TajoParquetReader(file, schema);
- tuple = reader.read();
-
- assertNotNull(tuple);
- assertEquals(true, tuple.getBool(0));
- assertEquals((byte)128, tuple.getByte(1));
- assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2))));
- assertEquals((short)2048, tuple.getInt2(3));
- assertEquals(4096, tuple.getInt4(4));
- assertEquals(8192L, tuple.getInt8(5));
- assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6)));
- assertEquals(new Double(4.1), new Double(tuple.getFloat8(7)));
- assertTrue(HELLO.equals(tuple.getText(8)));
- assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9));
- assertEquals(NullDatum.get(), tuple.get(10));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
deleted file mode 100644
index 49a162b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes.Type;
-
-import org.junit.Test;
-
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link TajoSchemaConverter}.
- */
-public class TestSchemaConverter {
- private static final String ALL_PARQUET_SCHEMA =
- "message table_schema {\n" +
- " optional boolean myboolean;\n" +
- " optional int32 myint;\n" +
- " optional int64 mylong;\n" +
- " optional float myfloat;\n" +
- " optional double mydouble;\n" +
- " optional binary mybytes;\n" +
- " optional binary mystring (UTF8);\n" +
- " optional fixed_len_byte_array(1) myfixed;\n" +
- "}\n";
-
- private static final String CONVERTED_ALL_PARQUET_SCHEMA =
- "message table_schema {\n" +
- " optional boolean myboolean;\n" +
- " optional int32 mybit;\n" +
- " optional binary mychar (UTF8);\n" +
- " optional int32 myint2;\n" +
- " optional int32 myint4;\n" +
- " optional int64 myint8;\n" +
- " optional float myfloat4;\n" +
- " optional double myfloat8;\n" +
- " optional binary mytext (UTF8);\n" +
- " optional binary myblob;\n" +
- // NULL_TYPE fields are not encoded.
- " optional binary myinet4;\n" +
- " optional binary myprotobuf;\n" +
- "}\n";
-
- private Schema createAllTypesSchema() {
- List<Column> columns = new ArrayList<Column>();
- columns.add(new Column("myboolean", Type.BOOLEAN));
- columns.add(new Column("mybit", Type.BIT));
- columns.add(new Column("mychar", Type.CHAR));
- columns.add(new Column("myint2", Type.INT2));
- columns.add(new Column("myint4", Type.INT4));
- columns.add(new Column("myint8", Type.INT8));
- columns.add(new Column("myfloat4", Type.FLOAT4));
- columns.add(new Column("myfloat8", Type.FLOAT8));
- columns.add(new Column("mytext", Type.TEXT));
- columns.add(new Column("myblob", Type.BLOB));
- columns.add(new Column("mynull", Type.NULL_TYPE));
- columns.add(new Column("myinet4", Type.INET4));
- columns.add(new Column("myprotobuf", Type.PROTOBUF));
- Column[] columnsArray = new Column[columns.size()];
- columnsArray = columns.toArray(columnsArray);
- return new Schema(columnsArray);
- }
-
- private Schema createAllTypesConvertedSchema() {
- List<Column> columns = new ArrayList<Column>();
- columns.add(new Column("myboolean", Type.BOOLEAN));
- columns.add(new Column("myint", Type.INT4));
- columns.add(new Column("mylong", Type.INT8));
- columns.add(new Column("myfloat", Type.FLOAT4));
- columns.add(new Column("mydouble", Type.FLOAT8));
- columns.add(new Column("mybytes", Type.BLOB));
- columns.add(new Column("mystring", Type.TEXT));
- columns.add(new Column("myfixed", Type.BLOB));
- Column[] columnsArray = new Column[columns.size()];
- columnsArray = columns.toArray(columnsArray);
- return new Schema(columnsArray);
- }
-
- private void testTajoToParquetConversion(
- Schema tajoSchema, String schemaString) throws Exception {
- TajoSchemaConverter converter = new TajoSchemaConverter();
- MessageType schema = converter.convert(tajoSchema);
- MessageType expected = MessageTypeParser.parseMessageType(schemaString);
- assertEquals("converting " + schema + " to " + schemaString,
- expected.toString(), schema.toString());
- }
-
- private void testParquetToTajoConversion(
- Schema tajoSchema, String schemaString) throws Exception {
- TajoSchemaConverter converter = new TajoSchemaConverter();
- Schema schema = converter.convert(
- MessageTypeParser.parseMessageType(schemaString));
- assertEquals("converting " + schemaString + " to " + tajoSchema,
- tajoSchema.toString(), schema.toString());
- }
-
- @Test
- public void testAllTypesToParquet() throws Exception {
- Schema schema = createAllTypesSchema();
- testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA);
- }
-
- @Test
- public void testAllTypesToTajo() throws Exception {
- Schema schema = createAllTypesConvertedSchema();
- testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
deleted file mode 100644
index 7b09937..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.io.IOUtils;
-
-import java.io.*;
-
-/**
- * Holds file metadata including type (regular file, or directory),
- * and the list of blocks that are pointers to the data.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class INode {
-
- enum FileType {
- DIRECTORY, FILE
- }
-
- public static final FileType[] FILE_TYPES = {
- FileType.DIRECTORY,
- FileType.FILE
- };
-
- public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
-
- private FileType fileType;
- private Block[] blocks;
-
- public INode(FileType fileType, Block[] blocks) {
- this.fileType = fileType;
- if (isDirectory() && blocks != null) {
- throw new IllegalArgumentException("A directory cannot contain blocks.");
- }
- this.blocks = blocks;
- }
-
- public Block[] getBlocks() {
- return blocks;
- }
-
- public FileType getFileType() {
- return fileType;
- }
-
- public boolean isDirectory() {
- return fileType == FileType.DIRECTORY;
- }
-
- public boolean isFile() {
- return fileType == FileType.FILE;
- }
-
- public long getSerializedLength() {
- return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
- }
-
-
- public InputStream serialize() throws IOException {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bytes);
- try {
- out.writeByte(fileType.ordinal());
- if (isFile()) {
- out.writeInt(blocks.length);
- for (int i = 0; i < blocks.length; i++) {
- out.writeLong(blocks[i].getId());
- out.writeLong(blocks[i].getLength());
- }
- }
- out.close();
- out = null;
- } finally {
- IOUtils.closeStream(out);
- }
- return new ByteArrayInputStream(bytes.toByteArray());
- }
-
- public static INode deserialize(InputStream in) throws IOException {
- if (in == null) {
- return null;
- }
- DataInputStream dataIn = new DataInputStream(in);
- FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
- switch (fileType) {
- case DIRECTORY:
- in.close();
- return INode.DIRECTORY_INODE;
- case FILE:
- int numBlocks = dataIn.readInt();
- Block[] blocks = new Block[numBlocks];
- for (int i = 0; i < numBlocks; i++) {
- long id = dataIn.readLong();
- long length = dataIn.readLong();
- blocks[i] = new Block(id, length);
- }
- in.close();
- return new INode(fileType, blocks);
- default:
- throw new IllegalArgumentException("Cannot deserialize inode.");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
deleted file mode 100644
index 40decc2..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.tajo.common.exception.NotImplementedException;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-/**
- * A stub implementation of {@link FileSystemStore} for testing
- * {@link S3FileSystem} without actually connecting to S3.
- */
-public class InMemoryFileSystemStore implements FileSystemStore {
-
- private Configuration conf;
- private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
- private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
-
- @Override
- public void initialize(URI uri, Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public String getVersion() throws IOException {
- return "0";
- }
-
- @Override
- public void deleteINode(Path path) throws IOException {
- inodes.remove(normalize(path));
- }
-
- @Override
- public void deleteBlock(Block block) throws IOException {
- blocks.remove(block.getId());
- }
-
- @Override
- public boolean inodeExists(Path path) throws IOException {
- return inodes.containsKey(normalize(path));
- }
-
- @Override
- public boolean blockExists(long blockId) throws IOException {
- return blocks.containsKey(blockId);
- }
-
- @Override
- public INode retrieveINode(Path path) throws IOException {
- return inodes.get(normalize(path));
- }
-
- @Override
- public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
- byte[] data = blocks.get(block.getId());
- File file = createTempFile();
- BufferedOutputStream out = null;
- try {
- out = new BufferedOutputStream(new FileOutputStream(file));
- out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
- } finally {
- if (out != null) {
- out.close();
- }
- }
- return file;
- }
-
- private File createTempFile() throws IOException {
- File dir = new File(conf.get("fs.s3.buffer.dir"));
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException("Cannot create S3 buffer directory: " + dir);
- }
- File result = File.createTempFile("test-", ".tmp", dir);
- result.deleteOnExit();
- return result;
- }
-
- @Override
- public Set<Path> listSubPaths(Path path) throws IOException {
- Path normalizedPath = normalize(path);
- // This is inefficient but more than adequate for testing purposes.
- Set<Path> subPaths = new LinkedHashSet<Path>();
- for (Path p : inodes.tailMap(normalizedPath).keySet()) {
- if (normalizedPath.equals(p.getParent())) {
- subPaths.add(p);
- }
- }
- return subPaths;
- }
-
- @Override
- public Set<Path> listDeepSubPaths(Path path) throws IOException {
- Path normalizedPath = normalize(path);
- String pathString = normalizedPath.toUri().getPath();
- if (!pathString.endsWith("/")) {
- pathString += "/";
- }
- // This is inefficient but more than adequate for testing purposes.
- Set<Path> subPaths = new LinkedHashSet<Path>();
- for (Path p : inodes.tailMap(normalizedPath).keySet()) {
- if (p.toUri().getPath().startsWith(pathString)) {
- subPaths.add(p);
- }
- }
- return subPaths;
- }
-
- @Override
- public void storeINode(Path path, INode inode) throws IOException {
- inodes.put(normalize(path), inode);
- }
-
- @Override
- public void storeBlock(Block block, File file) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buf = new byte[8192];
- int numRead;
- BufferedInputStream in = null;
- try {
- in = new BufferedInputStream(new FileInputStream(file));
- while ((numRead = in.read(buf)) >= 0) {
- out.write(buf, 0, numRead);
- }
- } finally {
- if (in != null) {
- in.close();
- }
- }
- blocks.put(block.getId(), out.toByteArray());
- }
-
- private Path normalize(Path path) {
- if (!path.isAbsolute()) {
- throw new IllegalArgumentException("Path must be absolute: " + path);
- }
- return new Path(path.toUri().getPath());
- }
-
- @Override
- public void purge() throws IOException {
- inodes.clear();
- blocks.clear();
- }
-
- @Override
- public void dump() throws IOException {
- throw new NotImplementedException();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
deleted file mode 100644
index d4034b9..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-class S3OutputStream extends OutputStream {
-
- private Configuration conf;
-
- private int bufferSize;
-
- private FileSystemStore store;
-
- private Path path;
-
- private long blockSize;
-
- private File backupFile;
-
- private OutputStream backupStream;
-
- private Random r = new Random();
-
- private boolean closed;
-
- private int pos = 0;
-
- private long filePos = 0;
-
- private int bytesWrittenToBlock = 0;
-
- private byte[] outBuf;
-
- private List<Block> blocks = new ArrayList<Block>();
-
- private Block nextBlock;
-
- private static final Log LOG =
- LogFactory.getLog(S3OutputStream.class.getName());
-
-
- public S3OutputStream(Configuration conf, FileSystemStore store,
- Path path, long blockSize, Progressable progress,
- int buffersize) throws IOException {
-
- this.conf = conf;
- this.store = store;
- this.path = path;
- this.blockSize = blockSize;
- this.backupFile = newBackupFile();
- this.backupStream = new FileOutputStream(backupFile);
- this.bufferSize = buffersize;
- this.outBuf = new byte[bufferSize];
-
- }
-
- private File newBackupFile() throws IOException {
- File dir = new File(conf.get("fs.s3.buffer.dir"));
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException("Cannot create S3 buffer directory: " + dir);
- }
- File result = File.createTempFile("output-", ".tmp", dir);
- result.deleteOnExit();
- return result;
- }
-
- public long getPos() throws IOException {
- return filePos;
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
-
- if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
- flush();
- }
- outBuf[pos++] = (byte) b;
- filePos++;
- }
-
- @Override
- public synchronized void write(byte b[], int off, int len) throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
- while (len > 0) {
- int remaining = bufferSize - pos;
- int toWrite = Math.min(remaining, len);
- System.arraycopy(b, off, outBuf, pos, toWrite);
- pos += toWrite;
- off += toWrite;
- len -= toWrite;
- filePos += toWrite;
-
- if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
- flush();
- }
- }
- }
-
- @Override
- public synchronized void flush() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
-
- if (bytesWrittenToBlock + pos >= blockSize) {
- flushData((int) blockSize - bytesWrittenToBlock);
- }
- if (bytesWrittenToBlock == blockSize) {
- endBlock();
- }
- flushData(pos);
- }
-
- private synchronized void flushData(int maxPos) throws IOException {
- int workingPos = Math.min(pos, maxPos);
-
- if (workingPos > 0) {
- //
- // To the local block backup, write just the bytes
- //
- backupStream.write(outBuf, 0, workingPos);
-
- //
- // Track position
- //
- bytesWrittenToBlock += workingPos;
- System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
- pos -= workingPos;
- }
- }
-
- private synchronized void endBlock() throws IOException {
- //
- // Done with local copy
- //
- backupStream.close();
-
- //
- // Send it to S3
- //
- // TODO: Use passed in Progressable to report progress.
- nextBlockOutputStream();
- store.storeBlock(nextBlock, backupFile);
- Block[] arr = new Block[blocks.size()];
- arr = blocks.toArray(arr);
- store.storeINode(path, new INode(INode.FILE_TYPES[1], arr));
-
- //
- // Delete local backup, start new one
- //
- boolean b = backupFile.delete();
- if (!b) {
- LOG.warn("Ignoring failed delete");
- }
- backupFile = newBackupFile();
- backupStream = new FileOutputStream(backupFile);
- bytesWrittenToBlock = 0;
- }
-
- private synchronized void nextBlockOutputStream() throws IOException {
- long blockId = r.nextLong();
- while (store.blockExists(blockId)) {
- blockId = r.nextLong();
- }
- nextBlock = new Block(blockId, bytesWrittenToBlock);
- blocks.add(nextBlock);
- bytesWrittenToBlock = 0;
- }
-
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- flush();
- if (filePos == 0 || bytesWrittenToBlock != 0) {
- endBlock();
- }
-
- backupStream.close();
- boolean b = backupFile.delete();
- if (!b) {
- LOG.warn("Ignoring failed delete");
- }
-
- super.close();
-
- closed = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
deleted file mode 100644
index fc1c908..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SmallBlockS3FileSystem extends S3FileSystem {
-
- private URI uri;
-
- private FileSystemStore store;
-
- private Path workingDir;
-
- static class Holder {
- private static InMemoryFileSystemStore s;
-
- public synchronized static FileSystemStore get() {
- if(s != null) {
- return s;
- }
- s = new InMemoryFileSystemStore();
- return s;
- }
-
- public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) {
- s = inMemoryFileSystemStore;
- }
- }
-
- public SmallBlockS3FileSystem() {
- }
-
-
- public SmallBlockS3FileSystem(
- InMemoryFileSystemStore inMemoryFileSystemStore) {
- Holder.set(inMemoryFileSystemStore);
- this.store = inMemoryFileSystemStore;
- }
-
- @Override
- public URI getUri() {
- return uri;
- }
- @Override
- public long getDefaultBlockSize() {
- return 10;
- }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- if (store == null) {
- store = Holder.get();
- }
- store.initialize(uri, conf);
- setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- this.workingDir =
- new Path("/user", System.getProperty("user.name")).makeQualified(this);
- }
- @Override
- public boolean isFile(Path path) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(path));
- if (inode == null) {
- return false;
- }
- return inode.isFile();
- }
-
- private INode checkFile(Path path) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(path));
- if (inode == null) {
- throw new IOException("No such file.");
- }
- if (inode.isDirectory()) {
- throw new IOException("Path " + path + " is a directory.");
- }
- return inode;
- }
-
- @Override
- public FileStatus[] listStatus(Path f) throws IOException {
- Path absolutePath = makeAbsolute(f);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- throw new FileNotFoundException("File " + f + " does not exist.");
- }
- if (inode.isFile()) {
- return new FileStatus[] {
- new S3FileStatus(f.makeQualified(this), inode)
- };
- }
- ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
- for (Path p : store.listSubPaths(absolutePath)) {
- ret.add(getFileStatus(p.makeQualified(this)));
- }
- return ret.toArray(new FileStatus[0]);
- }
- @Override
- public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize,
- short replication, long blockSize, Progressable progress)
- throws IOException {
-
- INode inode = store.retrieveINode(makeAbsolute(file));
- if (inode != null) {
- if (overwrite) {
- delete(file, true);
- } else {
- throw new IOException("File already exists: " + file);
- }
- } else {
- Path parent = file.getParent();
- if (parent != null) {
- if (!mkdirs(parent)) {
- throw new IOException("Mkdirs failed to create " + parent.toString());
- }
- }
- }
- return new FSDataOutputStream
- (new S3OutputStream(getConf(), store, makeAbsolute(file),
- blockSize, progress, bufferSize),
- statistics);
- }
- @Override
- public boolean mkdirs(Path path, FsPermission permission) throws IOException {
- Path absolutePath = makeAbsolute(path);
- List<Path> paths = new ArrayList<Path>();
- do {
- paths.add(0, absolutePath);
- absolutePath = absolutePath.getParent();
- } while (absolutePath != null);
-
- boolean result = true;
- for (Path p : paths) {
- result &= mkdir(p);
- }
- return result;
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- Path absoluteSrc = makeAbsolute(src);
- INode srcINode = store.retrieveINode(absoluteSrc);
- if (srcINode == null) {
- // src path doesn't exist
- return false;
- }
- Path absoluteDst = makeAbsolute(dst);
- INode dstINode = store.retrieveINode(absoluteDst);
- if (dstINode != null && dstINode.isDirectory()) {
- absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
- dstINode = store.retrieveINode(absoluteDst);
- }
- if (dstINode != null) {
- // dst path already exists - can't overwrite
- return false;
- }
- Path dstParent = absoluteDst.getParent();
- if (dstParent != null) {
- INode dstParentINode = store.retrieveINode(dstParent);
- if (dstParentINode == null || dstParentINode.isFile()) {
- // dst parent doesn't exist or is a file
- return false;
- }
- }
- return renameRecursive(absoluteSrc, absoluteDst);
- }
-
- private boolean renameRecursive(Path src, Path dst) throws IOException {
- INode srcINode = store.retrieveINode(src);
- store.storeINode(dst, srcINode);
- store.deleteINode(src);
- if (srcINode.isDirectory()) {
- for (Path oldSrc : store.listDeepSubPaths(src)) {
- INode inode = store.retrieveINode(oldSrc);
- if (inode == null) {
- return false;
- }
- String oldSrcPath = oldSrc.toUri().getPath();
- String srcPath = src.toUri().getPath();
- String dstPath = dst.toUri().getPath();
- Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
- store.storeINode(newDst, inode);
- store.deleteINode(oldSrc);
- }
- }
- return true;
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
- Path absolutePath = makeAbsolute(path);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- return false;
- }
- if (inode.isFile()) {
- store.deleteINode(absolutePath);
- for (Block block: inode.getBlocks()) {
- store.deleteBlock(block);
- }
- } else {
- FileStatus[] contents = null;
- try {
- contents = listStatus(absolutePath);
- } catch(FileNotFoundException fnfe) {
- return false;
- }
-
- if ((contents.length !=0) && (!recursive)) {
- throw new IOException("Directory " + path.toString()
- + " is not empty.");
- }
- for (FileStatus p:contents) {
- if (!delete(p.getPath(), recursive)) {
- return false;
- }
- }
- store.deleteINode(absolutePath);
- }
- return true;
- }
-
- /**
- * FileStatus for S3 file systems.
- */
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(f));
- if (inode == null) {
- throw new FileNotFoundException(f + ": No such file or directory.");
- }
- return new S3FileStatus(f.makeQualified(this), inode);
- }
- private boolean mkdir(Path path) throws IOException {
- Path absolutePath = makeAbsolute(path);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- store.storeINode(absolutePath, INode.DIRECTORY_INODE);
- } else if (inode.isFile()) {
- throw new IOException(String.format(
- "Can't make directory for path %s since it is a file.",
- absolutePath));
- }
- return true;
- }
- private Path makeAbsolute(Path path) {
- if (path.isAbsolute()) {
- return path;
- }
- return new Path(workingDir, path);
- }
-
- private static class S3FileStatus extends FileStatus {
-
- S3FileStatus(Path f, INode inode) throws IOException {
- super(findLength(inode), inode.isDirectory(), 1,
- findBlocksize(inode), 0, f);
- }
-
- private static long findLength(INode inode) {
- if (!inode.isDirectory()) {
- long length = 0L;
- for (Block block : inode.getBlocks()) {
- length += block.getLength();
- }
- return length;
- }
- return 0;
- }
-
- private static long findBlocksize(INode inode) {
- final Block[] ret = inode.getBlocks();
- return ret == null ? 0L : ret[0].getLength();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
deleted file mode 100644
index b332364..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.tuple.offheap.*;
-import org.junit.Test;
-
-public class TestBaseTupleBuilder {
-
- @Test
- public void testBuild() {
- BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
-
- OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248);
- OffHeapRowBlockReader reader = rowBlock.getReader();
-
- ZeroCopyTuple inputTuple = new ZeroCopyTuple();
-
- HeapTuple heapTuple = null;
- ZeroCopyTuple zcTuple = null;
- int i = 0;
- while(reader.next(inputTuple)) {
- RowStoreUtil.convert(inputTuple, builder);
-
- heapTuple = builder.buildToHeapTuple();
- TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
-
- zcTuple = builder.buildToZeroCopyTuple();
- TestOffHeapRowBlock.validateTupleResult(i, zcTuple);
-
- i++;
- }
- }
-
- @Test
- public void testBuildWithNull() {
- BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
-
- OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248);
- OffHeapRowBlockReader reader = rowBlock.getReader();
-
- ZeroCopyTuple inputTuple = new ZeroCopyTuple();
-
- HeapTuple heapTuple = null;
- ZeroCopyTuple zcTuple = null;
- int i = 0;
- while(reader.next(inputTuple)) {
- RowStoreUtil.convert(inputTuple, builder);
-
- heapTuple = builder.buildToHeapTuple();
- TestOffHeapRowBlock.validateNullity(i, heapTuple);
-
- zcTuple = builder.buildToZeroCopyTuple();
- TestOffHeapRowBlock.validateNullity(i, zcTuple);
-
- i++;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
deleted file mode 100644
index 96f465a..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.catalog.SchemaUtil;
-import org.junit.Test;
-
-public class TestHeapTuple {
-
- @Test
- public void testHeapTuple() {
- OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024);
-
- OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- int i = 0;
- while (reader.next(zcTuple)) {
- byte [] bytes = new byte[zcTuple.nioBuffer().limit()];
- zcTuple.nioBuffer().get(bytes);
-
- HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema));
- TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
- i++;
- }
-
- rowBlock.release();
- }
-}
\ No newline at end of file