You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/20 06:36:32 UTC
git commit: TAJO-435: Improve intermediate file. (jinho)
Updated Branches:
refs/heads/master f58f6ee82 -> f08724f9c
TAJO-435: Improve intermediate file. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f08724f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f08724f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f08724f9
Branch: refs/heads/master
Commit: f08724f9c8bb80d93db0d5517fe6351a9a4d6e63
Parents: f58f6ee
Author: jinossy <ji...@gmail.com>
Authored: Fri Dec 20 14:35:53 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Dec 20 14:35:53 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../engine/planner/PhysicalPlannerImpl.java | 4 +-
.../tajo/engine/planner/global/DataChannel.java | 10 +-
.../engine/planner/global/GlobalPlanner.java | 14 +-
.../planner/physical/IndexedStoreExec.java | 6 +-
.../planner/physical/PartitionedStoreExec.java | 4 +-
.../planner/physical/TestPhysicalPlanner.java | 14 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 15 +-
.../java/org/apache/tajo/storage/RawFile.java | 76 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 845 ++++++-------------
11 files changed, 348 insertions(+), 643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cceb1cb..d7872b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-435: Improve intermediate file. (jinho)
+
TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)
TAJO-419: Add missing visitor methods of AlgebraVisitor and
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index fb1c29b..5420692 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -136,6 +136,7 @@ public class TajoConf extends YarnConfiguration {
//////////////////////////////////
PULLSERVER_PORT("tajo.pullserver.port", 0),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
+ SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
//////////////////////////////////
// Storage Configuration
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 73395a6..5120106 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -90,7 +90,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
- storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+ if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
storeTableNode.setInSchema(plan.getOutSchema());
storeTableNode.setOutSchema(plan.getOutSchema());
if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
@@ -773,7 +773,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
List<FileFragment> fragments =
- FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
+ FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 0401718..556c7ff 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -36,7 +36,7 @@ public class DataChannel {
private Schema schema;
- private StoreType storeType = StoreType.CSV;
+ private StoreType storeType = StoreType.RAW;
public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
this.srcId = srcId;
@@ -77,6 +77,10 @@ public class DataChannel {
if (proto.hasPartitionNum()) {
this.partitionNum = proto.getPartitionNum();
}
+
+ if (proto.hasStoreType()) {
+ this.storeType = proto.getStoreType();
+ }
}
public ExecutionBlockId getSrcId() {
@@ -163,6 +167,10 @@ public class DataChannel {
if (partitionNum != null) {
builder.setPartitionNum(partitionNum);
}
+
+ if(storeType != null){
+ builder.setStoreType(storeType);
+ }
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index abf5620..ea3c366 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -43,12 +44,13 @@ public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
private TajoConf conf;
- private AbstractStorageManager sm;
+ private CatalogProtos.StoreType storeType;
public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
throws IOException {
this.conf = conf;
- this.sm = sm;
+ this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
+ Preconditions.checkArgument(storeType != null);
}
public class GlobalPlanContext {
@@ -76,6 +78,7 @@ public class GlobalPlanner {
if (childExecBlock.getPlan() != null) {
ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
+ dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
dataChannel.setSchema(lastNode.getOutSchema());
masterPlan.addConnect(dataChannel);
masterPlan.setTerminal(terminalBlock);
@@ -99,6 +102,7 @@ public class GlobalPlanner {
ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+ channel.setStoreType(storeType);
if (join.getJoinType() != JoinType.CROSS) {
Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
@@ -213,6 +217,7 @@ public class GlobalPlanner {
channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
channel.setPartitionKey(groupbyNode.getGroupingColumns());
channel.setSchema(topMostOfFirstPhase.getOutSchema());
+ channel.setStoreType(storeType);
// setup current block with channel
ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
@@ -271,6 +276,7 @@ public class GlobalPlanner {
channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
}
channel.setSchema(firstPhaseGroupBy.getOutSchema());
+ channel.setStoreType(storeType);
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
groupbyNode.setChild(scanNode);
@@ -299,6 +305,7 @@ public class GlobalPlanner {
DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
channel.setSchema(firstSortNode.getOutSchema());
+ channel.setStoreType(storeType);
ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
currentNode.setChild(secondScan);
@@ -359,6 +366,8 @@ public class GlobalPlanner {
DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
newChannel.setPartitionKey(new Column[]{});
newChannel.setSchema(node.getOutSchema());
+ newChannel.setStoreType(storeType);
+
ScanNode scanNode = buildInputExecutor(plan, newChannel);
LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
parentLimit.setChild(scanNode);
@@ -464,6 +473,7 @@ public class GlobalPlanner {
for (ExecutionBlock childBlocks : queryBlockBlocks) {
DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+ channel.setStoreType(storeType);
context.plan.addConnect(channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index 596c470..afb4d3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -67,8 +67,8 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
this.comp = new TupleComparator(keySchema, sortSpecs);
Path storeTablePath = new Path(context.getWorkDir(), "output");
LOG.info("Output data directory: " + storeTablePath);
- this.meta = CatalogUtil
- .newTableMeta(CatalogProtos.StoreType.CSV);
+ this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+ context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index 2e53229..bcea189 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
@@ -66,8 +65,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
Preconditions.checkArgument(plan.hasPartitionKey());
this.plan = plan;
- this.meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
+ this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
// about the partitions
this.numPartitions = this.plan.getNumPartitions();
int i = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b4c66a1..1a46af6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -455,7 +455,7 @@ public class TestPhysicalPlanner {
ctx.setDataChannel(dataChannel);
LogicalNode rootNode = optimizer.optimize(plan);
- TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
FileSystem fs = sm.getFileSystem();
@@ -513,7 +513,7 @@ public class TestPhysicalPlanner {
ctx.setDataChannel(dataChannel);
optimizer.optimize(plan);
- TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
@@ -597,7 +597,7 @@ public class TestPhysicalPlanner {
System.out.println(rootNode.toString());
// Set all aggregation functions to the first phase mode
- GroupbyNode groupbyNode = (GroupbyNode) PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+ GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
for (Target target : groupbyNode.getTargets()) {
for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
if (eval instanceof AggregationFunctionCallEval) {
@@ -795,12 +795,13 @@ public class TestPhysicalPlanner {
keySchema, comp);
reader.open();
Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, new Options());
+ TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new Options());
SeekableScanner scanner =
- StorageManagerFactory.getSeekableScanner(conf, meta, employee.getSchema(), outputPath);
+ StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
scanner.init();
int cnt = 0;
+
while(scanner.next() != null) {
cnt++;
}
@@ -814,7 +815,8 @@ public class TestPhysicalPlanner {
long offsets = reader.find(keytuple);
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(0).asChars()));
+
+ assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars()));
assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 43ea5f8..2d97e7a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -35,7 +35,10 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.engine.planner.physical.IndexedStoreExec;
+import org.apache.tajo.engine.planner.physical.MemSortExec;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.planner.physical.ProjectionExec;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -130,9 +133,9 @@ public class TestRangeRetrieverHandler {
FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
- TaskAttemptContext
- ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
new FileFragment[] {frags[0]}, testDir);
+
Expr expr = analyzer.parse(SORT_QUERY[0]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -159,7 +162,8 @@ public class TestRangeRetrieverHandler {
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta, schema,
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new Options());
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
@@ -270,7 +274,8 @@ public class TestRangeRetrieverHandler {
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
+ TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new Options());
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, outputMeta, schema,
StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index e3ddd09..7cce7e5 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -18,7 +18,6 @@
package org.apache.tajo.storage;
-import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.BitArray;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
@@ -58,6 +58,7 @@ public class RawFile {
private static final int RECORD_SIZE = 4;
private boolean eof = false;
private long fileSize;
+ private FileInputStream fis;
public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
super(conf, schema, meta, null);
@@ -74,15 +75,15 @@ public class RawFile {
//Preconditions.checkArgument(FileUtil.isLocalPath(path));
// TODO - to make it unified one.
URI uri = path.toUri();
- RandomAccessFile raf = new RandomAccessFile(new File(uri), "r");
- channel = raf.getChannel();
+ fis = new FileInputStream(new File(uri));
+ channel = fis.getChannel();
fileSize = channel.size();
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
}
- buffer = ByteBuffer.allocateDirect(65535 * 4);
+ buffer = ByteBuffer.allocateDirect(128 * 1024);
columnTypes = new DataType[schema.getColumnNum()];
for (int i = 0; i < schema.getColumnNum(); i++) {
@@ -103,17 +104,27 @@ public class RawFile {
@Override
public long getNextOffset() throws IOException {
- return channel.position();
+ return channel.position() - buffer.remaining();
}
@Override
public void seek(long offset) throws IOException {
- channel.position(offset);
+ long currentPos = channel.position();
+ if(currentPos < offset && offset < currentPos + buffer.limit()){
+ buffer.position((int)(offset - currentPos));
+ } else {
+ buffer.clear();
+ channel.position(offset);
+ channel.read(buffer);
+ buffer.flip();
+ eof = false;
+ }
}
private boolean fillBuffer() throws IOException {
buffer.compact();
if (channel.read(buffer) == -1) {
+ eof = true;
return false;
} else {
buffer.flip();
@@ -132,18 +143,15 @@ public class RawFile {
}
// backup the buffer state
- int recordOffset = buffer.position();
int bufferLimit = buffer.limit();
-
int recordSize = buffer.getInt();
int nullFlagSize = buffer.getShort();
+
buffer.limit(buffer.position() + nullFlagSize);
nullFlags.fromByteBuffer(buffer);
-
// restore the start of record contents
buffer.limit(bufferLimit);
- buffer.position(recordOffset + headerSize);
-
+ //buffer.position(recordOffset + headerSize);
if (buffer.remaining() < (recordSize - headerSize)) {
if (!fillBuffer()) {
return null;
@@ -249,26 +257,6 @@ public class RawFile {
return tuple;
}
- /**
- * It reads a variable byte array whose length is represented as a variable unsigned integer.
- *
- * @return A byte array read
- */
- private byte [] getColumnBytes() throws IOException {
- byte [] lenBytesLen = new byte[4];
- buffer.mark();
- buffer.get(lenBytesLen);
- CodedInputStream ins = CodedInputStream.newInstance(lenBytesLen);
- int bytesLen = ins.readUInt32(); // get a variable unsigned integer length to be read
- int read = ins.getTotalBytesRead();
- buffer.reset();
- buffer.position(buffer.position() + read);
-
- byte [] rawBytes = new byte[bytesLen];
- buffer.get(rawBytes);
- return rawBytes;
- }
-
@Override
public void reset() throws IOException {
// clear the buffer
@@ -284,6 +272,7 @@ public class RawFile {
public void close() throws IOException {
buffer.clear();
channel.close();
+ fis.close();
}
@Override
@@ -311,6 +300,7 @@ public class RawFile {
private BitArray nullFlags;
private int headerSize = 0;
private static final int RECORD_SIZE = 4;
+ private long pos;
private TableStatistics stats;
@@ -324,13 +314,14 @@ public class RawFile {
File file = new File(path.toUri());
randomAccessFile = new RandomAccessFile(file, "rw");
channel = randomAccessFile.getChannel();
+ pos = 0;
columnTypes = new DataType[schema.getColumnNum()];
for (int i = 0; i < schema.getColumnNum(); i++) {
columnTypes[i] = schema.getColumn(i).getDataType();
}
- buffer = ByteBuffer.allocateDirect(65535);
+ buffer = ByteBuffer.allocateDirect(64 * 1024);
// comput the number of bytes, representing the null flags
@@ -346,7 +337,7 @@ public class RawFile {
@Override
public long getOffset() throws IOException {
- return channel.position();
+ return pos;
}
private void flushBuffer() throws IOException {
@@ -386,8 +377,7 @@ public class RawFile {
// skip the row header
int recordOffset = buffer.position();
- buffer.position(buffer.position() + headerSize);
-
+ buffer.position(recordOffset + headerSize);
// reset the null flags
nullFlags.clear();
for (int i = 0; i < schema.getColumnNum(); i++) {
@@ -496,13 +486,15 @@ public class RawFile {
}
// write a record header
- int pos = buffer.position();
+ int bufferPos = buffer.position();
buffer.position(recordOffset);
- buffer.putInt(pos - recordOffset);
+ buffer.putInt(bufferPos - recordOffset);
byte [] flags = nullFlags.toArray();
buffer.putShort((short) flags.length);
buffer.put(flags);
- buffer.position(pos);
+
+ pos += bufferPos - recordOffset;
+ buffer.position(bufferPos);
if (enabledStats) {
stats.incrementRow();
@@ -512,12 +504,18 @@ public class RawFile {
@Override
public void flush() throws IOException {
flushBuffer();
- channel.force(true);
}
@Override
public void close() throws IOException {
flush();
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+ }
+ channel.close();
randomAccessFile.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index bd152f3..34c362c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -34,12 +34,19 @@ import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class TestBSTIndex {
private TajoConf conf;
private Schema schema;
@@ -50,8 +57,10 @@ public class TestBSTIndex {
private static final String TEST_PATH = "target/test-data/TestIndex";
private Path testDir;
private FileSystem fs;
-
- public TestBSTIndex() {
+ private StoreType storeType;
+
+ public TestBSTIndex(StoreType type) {
+ this.storeType = type;
conf = new TajoConf();
conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
schema = new Schema();
@@ -62,37 +71,45 @@ public class TestBSTIndex {
schema.addColumn(new Column("string", Type.TEXT));
}
-
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {StoreType.CSV},
+ {StoreType.RAW}
+ });
+ }
+
@Before
public void setUp() throws Exception {
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
}
-
+
@Test
- public void testFindValueInCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Path tablePath = new Path(testDir, "FindValueInCSV.csv");
- Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ public void testFindValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindValue_" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec [] sortKeys = new SortSpec[2];
+
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
@@ -101,11 +118,11 @@ public class TestBSTIndex {
keySchema.addColumn(new Column("double", Type.FLOAT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
+
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInCSV.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX,
- keySchema, comp);
+ keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
@@ -119,52 +136,54 @@ public class TestBSTIndex {
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
-
+
keyTuple.put(0, tuple.get(1));
keyTuple.put(1, tuple.get(2));
creater.write(keyTuple, offset);
}
-
+
creater.flush();
creater.close();
scanner.close();
-
+
tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
reader.open();
scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
scanner.init();
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," +(tuple.get(1).asInt8())+ "]" , (i) == (tuple.get(1).asInt8()));
- assertTrue("seek check [" + (i) + " ," +(tuple.get(2).asFloat8())+"]" , (i) == (tuple.get(2).asFloat8()));
-
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(1).asInt8()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
}
+ reader.close();
+ scanner.close();
}
@Test
public void testBuildIndexWithAppender() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "BuildIndexWithAppender.csv");
- FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+ Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
+ FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
appender.init();
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
@@ -175,14 +194,14 @@ public class TestBSTIndex {
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "BuildIndexWithAppender.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
Tuple tuple;
long offset;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+ for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
@@ -206,20 +225,20 @@ public class TestBSTIndex {
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
keySchema, comp);
reader.open();
SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
scanner.init();
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
- assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(2).asFloat8()));
+ assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
offsets = reader.next();
if (offsets == -1) {
@@ -227,34 +246,36 @@ public class TestBSTIndex {
}
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(1).asInt8()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
}
+ reader.close();
+ scanner.close();
}
-
+
@Test
- public void testFindOmittedValueInCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Path tablePath = StorageUtil.concatPath(testDir, "FindOmittedValueInCSV.csv");
+ public void testFindOmittedValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_" + i));
- appender.addTuple(tuple);
- }
+ for (int i = 0; i < TUPLE_NUM; i += 2) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
-
- SortSpec [] sortKeys = new SortSpec[2];
+
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
@@ -263,14 +284,14 @@ public class TestBSTIndex {
keySchema.addColumn(new Column("double", Type.FLOAT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
+
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInCSV.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -280,35 +301,37 @@ public class TestBSTIndex {
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
-
+
keyTuple.put(0, tuple.get(1));
keyTuple.put(1, tuple.get(2));
creater.write(keyTuple, offset);
}
-
+
creater.flush();
creater.close();
scanner.close();
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInCSV.idx"), keySchema, comp);
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+ keySchema, comp);
reader.open();
- for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
+ for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
keyTuple.put(0, DatumFactory.createInt8(i));
keyTuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(keyTuple);
assertEquals(-1, offsets);
}
+ reader.close();
}
-
+
@Test
- public void testFindNextKeyValueInCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ public void testFindNextKeyValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "FindNextKeyValueInCSV.csv");
+ Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+ for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
@@ -322,8 +345,8 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec [] sortKeys = new SortSpec[2];
+
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
@@ -332,14 +355,14 @@ public class TestBSTIndex {
keySchema.addColumn(new Column("long", Type.INT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
+
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -349,24 +372,24 @@ public class TestBSTIndex {
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
-
+
keyTuple.put(0, tuple.get(0));
keyTuple.put(1, tuple.get(1));
creater.write(keyTuple, offset);
}
-
+
creater.flush();
creater.close();
scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
scanner.init();
Tuple result;
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
@@ -375,28 +398,30 @@ public class TestBSTIndex {
result = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]",
(i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
-
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
result = scanner.next();
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8()));
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8()));
+ assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
+ assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
}
+ reader.close();
+ scanner.close();
}
-
+
@Test
- public void testFindNextKeyOmittedValueInCSV() throws IOException {
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ public void testFindNextKeyOmittedValue() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
- Path tablePath = new Path(testDir, "FindNextKeyOmittedValueInCSV.csv");
+ Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i+=2) {
+ for (int i = 0; i < TUPLE_NUM; i += 2) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
@@ -410,8 +435,8 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-
- SortSpec [] sortKeys = new SortSpec[2];
+
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
@@ -420,167 +445,16 @@ public class TestBSTIndex {
keySchema.addColumn(new Column("long", Type.INT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
- "FindNextKeyOmittedValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInCSV.idx"),
- keySchema, comp);
- reader.open();
- scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
- scanner.init();
-
- Tuple result;
- for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
- }
- }
-
- /*
- @Test
- public void testFindValueInRaw() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- appender.close();
- FileStatus status = sm.listTableFiles("table1")[0];
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", DataType.LONG));
- keySchema.addColumn(new Column("double", DataType.DOUBLE));
-
- TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInRawBSTIndex.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInRawBSTIndex.idx"), keySchema, comp);
- reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple, false);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(1).asLong()));
- assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(2).asDouble()));
- }
- }
-
- @Test
- public void testFindOmittedValueInRaw() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
- sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", DataType.LONG));
- keySchema.addColumn(new Column("double", DataType.DOUBLE));
-
- TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInRaw.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
Tuple keyTuple;
long offset;
while (true) {
@@ -588,305 +462,78 @@ public class TestBSTIndex {
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(1));
- keyTuple.put(1, tuple.get(2));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
- scanner.close();
-
- tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInRaw.idx"),
- keySchema, comp);
- reader.open();
- for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
- tuple.put(0, DatumFactory.createInt8(i));
- tuple.put(1, DatumFactory.createFloat8(i));
- long offsets = reader.find(tuple, false);
- assertEquals(-1, offsets);
- }
- }
-
- @Test
- public void testFindNextKeyValueInRaw() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
-
- TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInRaw.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
keyTuple.put(0, tuple.get(0));
keyTuple.put(1, tuple.get(1));
creater.write(keyTuple, offset);
}
-
- creater.flush();
- creater.close();
- scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInRaw.idx"), keySchema, comp);
- reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple result;
- for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(i));
- keyTuple.put(1, DatumFactory.createInt8(i));
- long offsets = reader.find(keyTuple, true);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]",
- (i + 1) == (result.get(0).asInt()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asLong()));
-
- offsets = reader.next();
- if (offsets == -1) {
- continue;
- }
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asLong()));
- assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asDouble()));
- }
- }
-
- @Test
- public void testFindNextKeyOmittedValueInRaw() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i+=2) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
- appender.addTuple(tuple);
- }
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
-
- TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
creater.flush();
creater.close();
scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple result;
- for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
+ for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
long offsets = reader.find(keyTuple, true);
scanner.seek(offsets);
result = scanner.next();
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt()));
- assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asLong()));
- }
- }
-
- @Test
- public void testNextInRaw() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
- Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
- tuple = new VTuple(5);
- tuple.put(0, DatumFactory.createInt4(i));
- tuple.put(1, DatumFactory.createInt8(i));
- tuple.put(2, DatumFactory.createFloat8(i));
- tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
- appender.addTuple(tuple);
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
}
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
- long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-
- SortSpec [] sortKeys = new SortSpec[2];
- sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
- sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
- Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
-
- TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
- BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- creater.setLoadNum(LOAD_NUM);
- creater.open();
-
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple keyTuple;
- long offset;
- while (true) {
- keyTuple = new VTuple(2);
- offset = scanner.getNextOffset();
- tuple = scanner.next();
- if (tuple == null) break;
-
- keyTuple.put(0, tuple.get(0));
- keyTuple.put(1, tuple.get(1));
- creater.write(keyTuple, offset);
- }
-
- creater.flush();
- creater.close();
scanner.close();
-
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
- keySchema, comp);
- reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- Tuple result;
-
- keyTuple = new VTuple(2);
- keyTuple.put(0, DatumFactory.createInt4(0));
- keyTuple.put(1, DatumFactory.createInt8(0));
- long offsets = reader.find(keyTuple);
- scanner.seek(offsets);
- result = scanner.next();
- assertTrue("[seek check " + 0 + " ]" , (0) == (result.get(0).asInt()));
- assertTrue("[seek check " + 0 + " ]" , (0) == (result.get(1).asLong()));
-
- for (int i = 1; i < TUPLE_NUM; i++) {
- offsets = reader.next();
-
- scanner.seek(offsets);
- result = scanner.next();
- assertEquals(i, result.get(0).asInt());
- assertEquals(i, result.get(1).asLong());
- }
}
@Test
public void testFindMinValue() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
Tuple tuple;
- for(int i = 5 ; i < TUPLE_NUM + 5; i ++ ) {
+ for (int i = 5; i < TUPLE_NUM + 5; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
+ FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", DataType.LONG));
- keySchema.addColumn(new Column("double", DataType.DOUBLE));
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "Test.idx"), BSTIndex.TWO_LEVEL_INDEX,
- keySchema, comp);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -905,62 +552,71 @@ public class TestBSTIndex {
scanner.close();
tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "Test.idx"), keySchema, comp);
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+ keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
tuple.put(0, DatumFactory.createInt8(0));
tuple.put(1, DatumFactory.createFloat8(0));
offset = reader.find(tuple);
assertEquals(-1, offset);
- offset = reader.find(tuple , true);
+ offset = reader.find(tuple, true);
assertTrue(offset >= 0);
scanner.seek(offset);
tuple = scanner.next();
- assertEquals(5, tuple.get(1).asInt());
- assertEquals(5l, tuple.get(2).asLong());
+ assertEquals(5, tuple.get(1).asInt4());
+ assertEquals(5l, tuple.get(2).asInt8());
+ reader.close();
+ scanner.close();
}
@Test
public void testMinMax() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
+ meta = CatalogUtil.newTableMeta(storeType);
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
+ Path tablePath = new Path(testDir, "testMinMax_" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
Tuple tuple;
- for(int i = 5 ; i < TUPLE_NUM; i ++ ) {
+ for (int i = 5; i < TUPLE_NUM; i += 2) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
- FileStatus status = sm.listTableFiles("table1")[0];
+ FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -978,17 +634,18 @@ public class TestBSTIndex {
creater.close();
scanner.close();
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
keySchema, comp);
reader.open();
Tuple min = reader.getFirstKey();
- assertEquals(5, min.get(0).asInt());
- assertEquals(5l, min.get(0).asLong());
+ assertEquals(5, min.get(0).asInt4());
+ assertEquals(5l, min.get(0).asInt8());
Tuple max = reader.getLastKey();
- assertEquals(TUPLE_NUM - 1, max.get(0).asInt());
- assertEquals(TUPLE_NUM - 1, max.get(0).asLong());
+ assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
+ assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
+ reader.close();
}
private class ConcurrentAccessor implements Runnable {
@@ -1024,43 +681,47 @@ public class TestBSTIndex {
@Test
public void testConcurrentAccess() throws IOException, InterruptedException {
- meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
Tuple tuple;
- for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+ for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
- FileStatus status = sm.listTableFiles("table1")[0];
+ FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "ConcurrentAccess.idx"),
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -1078,12 +739,12 @@ public class TestBSTIndex {
creater.close();
scanner.close();
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "ConcurrentAccess.idx"),
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- Thread [] threads = new Thread[5];
- ConcurrentAccessor [] accs = new ConcurrentAccessor[5];
+ Thread[] threads = new Thread[5];
+ ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
for (int i = 0; i < threads.length; i++) {
accs[i] = new ConcurrentAccessor(reader);
threads[i] = new Thread(accs[i]);
@@ -1094,49 +755,54 @@ public class TestBSTIndex {
threads[i].join();
assertFalse(accs[i].isFailed());
}
+ reader.close();
}
+
@Test
- public void testFindValueInCSVDescOrder() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+ public void testFindValueDescOrder() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
Tuple tuple;
- for(int i = (TUPLE_NUM - 1); i >= 0; i -- ) {
+ for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
- appender.close();
-
- FileStatus status = sm.listTableFiles("table1")[0];
+ FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), false, false);
Schema keySchema = new Schema();
- keySchema.addColumn(new Column("long", DataType.LONG));
- keySchema.addColumn(new Column("double", DataType.DOUBLE));
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX,
- keySchema, comp);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -1155,17 +821,21 @@ public class TestBSTIndex {
scanner.close();
tuple = new VTuple(keySchema.getColumnNum());
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+ keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
- for(int i = (TUPLE_NUM - 1) ; i > 0 ; i --) {
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
+ for (int i = (TUPLE_NUM - 1); i > 0; i--) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("seek check [" + (i) + " ," +(tuple.get(1).asLong())+ "]" , (i) == (tuple.get(1).asLong()));
- assertTrue("seek check [" + (i) + " ," +(tuple.get(2).asDouble())+"]" , (i) == (tuple.get(2).asDouble()));
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+ assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
offsets = reader.next();
if (offsets == -1) {
@@ -1173,50 +843,56 @@ public class TestBSTIndex {
}
scanner.seek(offsets);
tuple = scanner.next();
- assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (tuple.get(0).asInt()));
- assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (tuple.get(1).asLong()));
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
}
+ reader.close();
+ scanner.close();
}
@Test
- public void testFindNextKeyValueInCSVDescOrder() throws IOException {
- meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+ public void testFindNextKeyValueDescOrder() throws IOException {
+ meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
- sm.initTableBase(meta, "table1");
- Appender appender = sm.getAppender(meta, "table1", "table1.csv");
Tuple tuple;
- for(int i = (TUPLE_NUM - 1); i >= 0; i --) {
+ for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
- tuple.put(4, DatumFactory.createText("field_"+i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
- FileStatus status = sm.listTableFiles("table1")[0];
+ FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
- SortSpec [] sortKeys = new SortSpec[2];
+ SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), false, false);
sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), false, false);
Schema keySchema = new Schema();
- keySchema.addColumn(new Column("int", DataType.INT));
- keySchema.addColumn(new Column("long", DataType.LONG));
+ keySchema.addColumn(new Column("int", Type.INT4));
+ keySchema.addColumn(new Column("long", Type.INT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
- BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+ "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -1234,15 +910,19 @@ public class TestBSTIndex {
creater.close();
scanner.close();
- BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
+
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
+ keySchema, comp);
reader.open();
assertEquals(keySchema, reader.getKeySchema());
assertEquals(comp, reader.getComparator());
- scanner = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+ scanner.init();
+
Tuple result;
- for(int i = (TUPLE_NUM - 1) ; i > 0 ; i --) {
+ for (int i = (TUPLE_NUM - 1); i > 0; i--) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
@@ -1250,8 +930,8 @@ public class TestBSTIndex {
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i - 1) + " ]",
- (i - 1) == (result.get(0).asInt()));
- assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (result.get(1).asLong()));
+ (i - 1) == (result.get(0).asInt4()));
+ assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
offsets = reader.next();
if (offsets == -1) {
@@ -1259,9 +939,10 @@ public class TestBSTIndex {
}
scanner.seek(offsets);
result = scanner.next();
- assertTrue("[seek check " + (i - 2) + " ]" , (i - 2) == (result.get(0).asLong()));
- assertTrue("[seek check " + (i - 2) + " ]" , (i - 2) == (result.get(1).asDouble()));
+ assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
+ assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
}
+ reader.close();
+ scanner.close();
}
- */
}