You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/07 05:50:42 UTC
tajo git commit: TAJO-2000: BSTIndex can cause OOM.
Repository: tajo
Updated Branches:
refs/heads/master c770fe75c -> ee6c2b5fe
TAJO-2000: BSTIndex can cause OOM.
Closes #892
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ee6c2b5f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ee6c2b5f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ee6c2b5f
Branch: refs/heads/master
Commit: ee6c2b5fe75753ea0b5b54e833e86e368b7ef3b2
Parents: c770fe7
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Dec 7 13:49:50 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Dec 7 13:49:50 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/storage/BufferPool.java | 30 +-
.../org/apache/tajo/tuple/memory/HeapTuple.java | 15 +
.../apache/tajo/tuple/memory/UnSafeTuple.java | 14 +
.../java/org/apache/tajo/util/FileUtil.java | 22 +
.../planner/physical/BSTIndexScanExec.java | 7 +-
.../physical/RangeShuffleFileWriteExec.java | 11 +-
.../engine/planner/physical/StoreIndexExec.java | 2 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 4 +-
.../tajo/pullserver/TajoPullServerService.java | 162 ++++---
.../apache/tajo/storage/index/bst/BSTIndex.java | 431 +++++++++++++------
.../apache/tajo/storage/index/TestBSTIndex.java | 135 +++++-
.../index/TestSingleCSVFileBSTIndex.java | 8 +-
13 files changed, 595 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f1afd9e..e54e661 100644
--- a/CHANGES
+++ b/CHANGES
@@ -57,6 +57,8 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2000: BSTIndex can cause OOM. (jinho)
+
TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung)
TAJO-1993: Table Timezone doesn't work when Timezone is not exactly same.(DaeMyung)
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 4913d3b..7c4e288 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -94,17 +94,39 @@ public class BufferPool {
public static ByteBuf directBuffer(int size) {
- return ALLOCATOR.directBuffer(size);
+ return directBuffer(size, ByteOrder.LITTLE_ENDIAN);
+ }
+
+ /**
+ * @param size the initial capacity
+ * @param order the endianness
+ * @return allocated ByteBuf from pool
+ */
+ public static ByteBuf directBuffer(int size, ByteOrder order) {
+ ByteBuf byteBuf = ALLOCATOR.directBuffer(size);
+ if (byteBuf.order() != order) byteBuf.order(order);
+ return byteBuf;
}
/**
- *
* @param size the initial capacity
- * @param max the max capacity
+ * @param max the max capacity
* @return allocated ByteBuf from pool
*/
public static ByteBuf directBuffer(int size, int max) {
- return ALLOCATOR.directBuffer(size, max).order(ByteOrder.LITTLE_ENDIAN);
+ return directBuffer(size, max, ByteOrder.LITTLE_ENDIAN);
+ }
+
+ /**
+ * @param size the initial capacity
+ * @param max the max capacity
+ * @param order the endianness
+ * @return allocated ByteBuf from pool
+ */
+ public static ByteBuf directBuffer(int size, int max, ByteOrder order) {
+ ByteBuf byteBuf = ALLOCATOR.directBuffer(size, max);
+ if (byteBuf.order() != order) byteBuf.order(order);
+ return byteBuf;
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
index c6c7daf..330b363 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
@@ -33,6 +33,7 @@ import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.datetime.TimeMeta;
import java.nio.ByteOrder;
+import java.util.Arrays;
import static org.apache.tajo.common.TajoDataTypes.DataType;
@@ -296,6 +297,20 @@ public class HeapTuple extends ZeroCopyTuple implements Cloneable {
}
@Override
+ public int hashCode() {
+ return Arrays.hashCode(getValues());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tuple) {
+ Tuple other = (Tuple) obj;
+ return Arrays.equals(getValues(), other.getValues());
+ }
+ return false;
+ }
+
+ @Override
public Tuple clone() throws CloneNotSupportedException {
HeapTuple heapTuple = (HeapTuple) super.clone();
heapTuple.buffer = buffer.copy(getRelativePos(), getLength());
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
index 26f7df3..dcff801 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
@@ -36,6 +36,7 @@ import sun.misc.Unsafe;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.util.Arrays;
import static org.apache.tajo.common.TajoDataTypes.DataType;
@@ -338,6 +339,19 @@ public class UnSafeTuple extends ZeroCopyTuple {
}
@Override
+ public int hashCode() {
+ return Arrays.hashCode(getValues());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tuple) {
+ Tuple other = (Tuple) obj;
+ return Arrays.equals(getValues(), other.getValues());
+ }
+ return false;
+ }
+ @Override
public String toString() {
return VTuple.toDisplayString(getValues());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 118f42a..95700d0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -120,4 +120,26 @@ public class FileUtil {
}
}
}
+
+ /**
+ * Close the Closeable objects and <b>throw</b> first {@link IOException}, if failed
+ * @param closeables the objects to close
+ */
+ public static void cleanupAndthrowIfFailed(java.io.Closeable... closeables) throws IOException {
+ IOException ioe = null;
+
+ for (java.io.Closeable c : closeables) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ if (ioe == null) ioe = e;
+ }
+ }
+ }
+
+ if (ioe != null) {
+ throw ioe;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index ee3762f..89c5b3d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -90,7 +90,6 @@ public class BSTIndexScanExec extends ScanExec {
Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
this.reader = new BSTIndex(context.getConf()).
getIndexReader(indexPath, keySchema, comparator);
- this.reader.open();
}
private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, List<Target> targets, EvalNode qual) {
@@ -101,9 +100,7 @@ public class BSTIndexScanExec extends ScanExec {
qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
}
for (Column column : originalSchema.getRootColumns()) {
- if (subSchema.contains(column)
- || qualAndTargets.contains(column)
- || qualAndTargets.contains(column)) {
+ if (subSchema.contains(column) || qualAndTargets.contains(column)) {
mergedSchema.addColumn(column);
}
}
@@ -127,6 +124,8 @@ public class BSTIndexScanExec extends ScanExec {
@Override
public void init() throws IOException {
+ reader.init();
+
Schema projected;
// in the case where projected column or expression are given
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index bcd2b17..e4217b3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -86,9 +86,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
this.appender.enableStats(keySchema.getAllColumns());
this.appender.init();
this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
- this.indexWriter.setLoadNum(100);
- this.indexWriter.open();
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true);
+ this.indexWriter.init();
super.init();
}
@@ -121,13 +120,11 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
super.close();
appender.flush();
- IOUtils.cleanup(LOG, appender);
- indexWriter.flush();
- IOUtils.cleanup(LOG, indexWriter);
-
// Collect statistics data
context.setResultStats(appender.getStats());
context.addShuffleFileOutput(0, context.getTaskId().toString());
+ IOUtils.cleanup(LOG, appender);
+ indexWriter.close();
appender = null;
indexWriter = null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index fa9fe3c..c5e1093 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -78,7 +78,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator);
this.indexWriter.setLoadNum(100);
- this.indexWriter.open();
+ this.indexWriter.init();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 873d9e0..74805ce 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -768,8 +768,8 @@ public class TaskImpl implements Task {
try {
chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("getFileChunks() throws exception");
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 6bcb7b4..ef3d7e0 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -661,109 +661,107 @@ public class TajoPullServerService extends AbstractService {
String endKey,
boolean last) throws IOException {
BSTIndex index = new BSTIndex(new TajoConf());
- BSTIndex.BSTIndexReader idxReader =
- index.getIndexReader(new Path(outDir, "index"));
- idxReader.open();
- Schema keySchema = idxReader.getKeySchema();
- TupleComparator comparator = idxReader.getComparator();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
- }
+ try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) {
+ Schema keySchema = idxReader.getKeySchema();
+ TupleComparator comparator = idxReader.getComparator();
- File data = new File(URI.create(outDir.toUri() + "/output"));
- byte [] startBytes = Base64.decodeBase64(startKey);
- byte [] endBytes = Base64.decodeBase64(endKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+ }
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
- Tuple start;
- Tuple end;
- try {
- start = decoder.toTuple(startBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("StartKey: " + startKey
- + ", decoded byte size: " + startBytes.length, t);
- }
+ File data = new File(URI.create(outDir.toUri() + "/output"));
+ byte[] startBytes = Base64.decodeBase64(startKey);
+ byte[] endBytes = Base64.decodeBase64(endKey);
- try {
- end = decoder.toTuple(endBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("EndKey: " + endKey
- + ", decoded byte size: " + endBytes.length, t);
- }
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+ Tuple start;
+ Tuple end;
+ try {
+ start = decoder.toTuple(startBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("StartKey: " + startKey
+ + ", decoded byte size: " + startBytes.length, t);
+ }
- LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
- (last ? ", last=true" : "") + ")");
+ try {
+ end = decoder.toTuple(endBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("EndKey: " + endKey
+ + ", decoded byte size: " + endBytes.length, t);
+ }
- if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
- LOG.info("There is no contents");
- return null;
- }
+ LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+ (last ? ", last=true" : "") + ")");
- if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
- comparator.compare(idxReader.getLastKey(), start) < 0) {
- LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
- "], but request start:" + start + ", end: " + end);
- return null;
- }
+ if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+ LOG.info("There is no contents");
+ return null;
+ }
- long startOffset;
- long endOffset;
- try {
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
+ if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+ comparator.compare(idxReader.getLastKey(), start) < 0) {
+ LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+ "], but request start:" + start + ", end: " + end);
+ return null;
}
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
+ long startOffset;
+ long endOffset;
try {
- startOffset = idxReader.find(start, true);
+ idxReader.init();
+ startOffset = idxReader.find(start);
} catch (IOException ioe) {
LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ idxReader.getFirstKey() + ", idx max: "
+ idxReader.getLastKey());
throw ioe;
}
- }
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
- // if greater than indexed values
- if (last || (endOffset == -1
- && comparator.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
- idxReader.close();
+ // if greater than indexed values
+ if (last || (endOffset == -1
+ && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+ FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
- return chunk;
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
+ }
}
public static List<String> splitMaps(List<String> mapq) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 3affd50..d212c1c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -18,13 +18,13 @@
package org.apache.tajo.storage.index.bst;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
import org.apache.tajo.storage.*;
@@ -33,12 +33,14 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.index.IndexMethod;
import org.apache.tajo.storage.index.IndexWriter;
import org.apache.tajo.storage.index.OrderIndexReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.util.LinkedList;
-import java.util.Set;
+import java.util.Map;
import java.util.TreeMap;
import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
@@ -55,6 +57,9 @@ public class BSTIndex implements IndexMethod {
public static final int ONE_LEVEL_INDEX = 1;
public static final int TWO_LEVEL_INDEX = 2;
+ public static final int DEFAULT_INDEX_LOAD = 4096;
+ public static final int BUFFER_SIZE = 128 * StorageUnit.KB;
+ public static final String WRITER_INDEX_LOAD = "tajo.executor.index.writer.load-num";
private final Configuration conf;
@@ -62,10 +67,16 @@ public class BSTIndex implements IndexMethod {
this.conf = conf;
}
+
+ public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator, boolean sorted) throws IOException {
+ return new BSTIndexWriter(fileName, level, keySchema, comparator, sorted);
+ }
+
@Override
public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
- return new BSTIndexWriter(fileName, level, keySchema, comparator);
+ TupleComparator comparator) throws IOException {
+ return getIndexWriter(fileName, level, keySchema, comparator, false);
}
@Override
@@ -78,23 +89,38 @@ public class BSTIndex implements IndexMethod {
}
public class BSTIndexWriter extends IndexWriter implements Closeable {
+ private FileChannel outChannel;
+ private RandomAccessFile outRandomAccessFile;
private FSDataOutputStream out;
- private FileSystem fs;
+ private long filePos;
+
+ private FileChannel rootOutChannel;
+ private RandomAccessFile rootOutRandomAccessFile;
+ private FSDataOutputStream rootOut;
+
+ private boolean isLocal;
+
private int level;
- private int loadNum = 4096;
+ private int loadNum;
private Path fileName;
+ // Target data set is sorted or not
+ private boolean sorted;
+ private boolean writeRootIndex;
private final Schema keySchema;
private final TupleComparator compartor;
private final KeyOffsetCollector collector;
private KeyOffsetCollector rootCollector;
+ private ByteBuf indexBuffer;
+ private ByteBuf rootIndexBuffer;
private Tuple firstKey;
private Tuple lastKey;
private RowStoreEncoder rowStoreEncoder;
-
- // private Tuple lastestKey = null;
+ private int loadCount;
+ private int entrySize;
+ private int rootEntrySize;
/**
* constructor
@@ -104,25 +130,63 @@ public class BSTIndex implements IndexMethod {
* @throws java.io.IOException
*/
public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
+ TupleComparator comparator, boolean sorted) throws IOException {
this.fileName = fileName;
this.level = level;
+ this.writeRootIndex = level == TWO_LEVEL_INDEX;
this.keySchema = keySchema;
this.compartor = comparator;
this.collector = new KeyOffsetCollector(comparator);
+ this.rootCollector = new KeyOffsetCollector(this.compartor);
this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
+ this.sorted = sorted;
+ this.indexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder());
+ this.rootIndexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder());
+ this.loadCount = loadNum = conf.getInt(WRITER_INDEX_LOAD, DEFAULT_INDEX_LOAD);
}
- public void setLoadNum(int loadNum) {
+ public void setLoadNum(int loadNum) {
this.loadNum = loadNum;
+ this.loadCount = loadNum;
}
- public void open() throws IOException {
- fs = fileName.getFileSystem(conf);
- if (fs.exists(fileName)) {
- throw new IOException("ERROR: index file (" + fileName + " already exists");
+ public void init() throws IOException {
+ FileSystem fs = fileName.getFileSystem(conf);
+ Path rootPath = new Path(fileName + ".root");
+ if (fs.exists(fileName) || fs.exists(rootPath)) {
+ throw new IOException("ERROR: index file " + fileName + " or " + rootPath + " already exists");
+ }
+
+ if (fs instanceof LocalFileSystem) {
+ File outFile;
+ try {
+ if (!fs.exists(fileName.getParent())) {
+ fs.mkdirs(fileName.getParent());
+ }
+
+ if (fileName.toUri().getScheme() != null) {
+ outFile = new File(fileName.toUri());
+ } else {
+ outFile = new File(fileName.toString());
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IOException(iae);
+ }
+
+ outRandomAccessFile = new RandomAccessFile(outFile, "rw");
+ outChannel = outRandomAccessFile.getChannel();
+
+ if (writeRootIndex) {
+ rootOutRandomAccessFile = new RandomAccessFile(new File(outFile.getAbsolutePath() + ".root"), "rw");
+ rootOutChannel = rootOutRandomAccessFile.getChannel();
+ }
+ isLocal = true;
+ } else {
+ out = fs.create(fileName, true);
+ if (writeRootIndex) {
+ rootOut = fs.create(rootPath, true);
+ }
}
- out = fs.create(fileName);
}
@Override
@@ -140,7 +204,83 @@ public class BSTIndex implements IndexMethod {
lastKey = keyTuple;
}
- collector.put(keyTuple, offset);
+ if (sorted) {
+ /* root index writing */
+ if (writeRootIndex) {
+ if (loadCount == loadNum) {
+ loadCount = 0;
+ writeRootIndex(rootIndexBuffer, keyTuple, filePos + indexBuffer.writerIndex());
+ }
+ loadCount++;
+ }
+
+ /* leaf index writing */
+ writeIndex(indexBuffer, keyTuple, offset);
+ } else {
+ collector.put(keyTuple, offset);
+ }
+ }
+
+ private void writeIndex(ByteBuf byteBuf, Tuple tuple, Long... offsets) throws IOException {
+
+ byte[] buf = rowStoreEncoder.toBytes(tuple);
+ int size = buf.length + 8 + (offsets.length * 8);
+ if (!byteBuf.isWritable(size)) {
+ byteBuf.ensureWritable(size);
+ }
+
+ // key writing
+ byteBuf.writeInt(buf.length);
+ byteBuf.writeBytes(buf);
+
+ //offset num writing
+ byteBuf.writeInt(offsets.length);
+
+ /* offset writing */
+ for (long offset : offsets) {
+ byteBuf.writeLong(offset);
+ }
+
+ entrySize++;
+ // flush to file and reset buffer
+ if (byteBuf.writerIndex() >= BUFFER_SIZE) {
+ filePos += flushBuffer(byteBuf, outChannel, out);
+ }
+ }
+
+ private void writeRootIndex(ByteBuf byteBuf, Tuple tuple, long offset) throws IOException {
+ byte[] buf = rowStoreEncoder.toBytes(tuple);
+ int size = buf.length + 12;
+ if (!byteBuf.isWritable(size)) {
+ byteBuf.ensureWritable(size);
+ }
+
+ // key writing
+ byteBuf.writeInt(buf.length);
+ byteBuf.writeBytes(buf);
+
+ // leaf offset writing
+ byteBuf.writeLong(offset);
+
+ rootEntrySize++;
+ // flush to file and reset buffer
+ if (byteBuf.writerIndex() >= BUFFER_SIZE) {
+ flushBuffer(byteBuf, rootOutChannel, rootOut);
+ }
+ }
+
+ private int flushBuffer(ByteBuf byteBuf, FileChannel channel, FSDataOutputStream out) throws IOException {
+ // write buffer to file
+ int readableBytes = byteBuf.readableBytes();
+ if (readableBytes > 0) {
+ if (isLocal) {
+ byteBuf.readBytes(channel, readableBytes);
+ } else {
+ byteBuf.readBytes(out, readableBytes);
+ }
+ byteBuf.clear();
+ }
+ return readableBytes;
}
public TupleComparator getComparator() {
@@ -148,107 +288,128 @@ public class BSTIndex implements IndexMethod {
}
public void flush() throws IOException {
- out.flush();
+ if (out != null) {
+ flushBuffer(indexBuffer, outChannel, out);
+ out.flush();
+ }
+
+ if (writeRootIndex && rootOut != null) {
+ flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+ rootOut.flush();
+ }
}
- public void writeHeader(int entryNum) throws IOException {
+ public void writeFooter(int entryNum) throws IOException {
+ indexBuffer.clear();
+
+ long startPosition = filePos;
// schema
byte [] schemaBytes = keySchema.getProto().toByteArray();
- out.writeInt(schemaBytes.length);
- out.write(schemaBytes);
-
// comparator
byte [] comparatorBytes = compartor.getProto().toByteArray();
- out.writeInt(comparatorBytes.length);
- out.write(comparatorBytes);
+
+ int size = schemaBytes.length + comparatorBytes.length + 16;
+ if(!indexBuffer.isWritable(size)) {
+ indexBuffer.ensureWritable(size);
+ }
+
+ indexBuffer.writeInt(schemaBytes.length);
+ indexBuffer.writeBytes(schemaBytes);
+
+ indexBuffer.writeInt(comparatorBytes.length);
+ indexBuffer.writeBytes(comparatorBytes);
// level
- out.writeInt(this.level);
+ indexBuffer.writeInt(this.level);
// entry
- out.writeInt(entryNum);
+ indexBuffer.writeInt(entryNum);
if (entryNum > 0) {
byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
- out.writeInt(minBytes.length);
- out.write(minBytes);
byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
- out.writeInt(maxBytes.length);
- out.write(maxBytes);
- }
- out.flush();
- }
- public void close() throws IOException {
- /* two level initialize */
- if (this.level == TWO_LEVEL_INDEX) {
- rootCollector = new KeyOffsetCollector(this.compartor);
+ size = minBytes.length + maxBytes.length + 12;
+ if(!indexBuffer.isWritable(size)) {
+ filePos += flushBuffer(indexBuffer, outChannel, out);
+ indexBuffer.ensureWritable(size);
+ }
+
+ indexBuffer.writeInt(minBytes.length);
+ indexBuffer.writeBytes(minBytes);
+ indexBuffer.writeInt(maxBytes.length);
+ indexBuffer.writeBytes(maxBytes);
}
- /* data writing phase */
- TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
- Set<Tuple> keySet = keyOffsetMap.keySet();
+ // write footer length
+ int footerSize = (int) (filePos + indexBuffer.readableBytes() + 4 - startPosition);
+ indexBuffer.writeInt(footerSize);
- int entryNum = keySet.size();
- writeHeader(entryNum);
+ filePos += flushBuffer(indexBuffer, outChannel, out);
+ }
- int loadCount = this.loadNum - 1;
- for (Tuple key : keySet) {
+ public void close() throws IOException {
+ /* data writing phase */
+ try {
+ if (sorted) {
+ // write remaining data to file
+ filePos += flushBuffer(indexBuffer, outChannel, out);
+ } else {
+ // flush collected index data
+ TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
+ for (Map.Entry<Tuple, LinkedList<Long>> entry : keyOffsetMap.entrySet()) {
+
+ /* two level initialize */
+ if (writeRootIndex) {
+ if (loadCount == loadNum) {
+ loadCount = 0;
+ rootCollector.put(entry.getKey(), filePos + indexBuffer.writerIndex());
+ }
+ loadCount++;
+ }
- if (this.level == TWO_LEVEL_INDEX) {
- loadCount++;
- if (loadCount == this.loadNum) {
- rootCollector.put(key, out.getPos());
- loadCount = 0;
+ LinkedList<Long> offsetList = entry.getValue();
+ writeIndex(indexBuffer, entry.getKey(), offsetList.toArray(new Long[offsetList.size()]));
}
+ filePos += flushBuffer(indexBuffer, outChannel, out);
+ collector.clear();
}
- /* key writing */
- byte[] buf = rowStoreEncoder.toBytes(key);
- out.writeInt(buf.length);
- out.write(buf);
-
- /**/
- LinkedList<Long> offsetList = keyOffsetMap.get(key);
- /* offset num writing */
- int offsetSize = offsetList.size();
- out.writeInt(offsetSize);
- /* offset writing */
- for (Long offset : offsetList) {
- out.writeLong(offset);
- }
- }
- out.flush();
- out.close();
- keySet.clear();
- collector.clear();
+ writeFooter(entrySize);
- FSDataOutputStream rootOut = null;
- /* root index creating phase */
- if (this.level == TWO_LEVEL_INDEX) {
- TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
- keySet = rootMap.keySet();
-
- rootOut = fs.create(new Path(fileName + ".root"));
- rootOut.writeInt(this.loadNum);
- rootOut.writeInt(keySet.size());
-
- /* root key writing */
- for (Tuple key : keySet) {
- byte[] buf = rowStoreEncoder.toBytes(key);
- rootOut.writeInt(buf.length);
- rootOut.write(buf);
-
- LinkedList<Long> offsetList = rootMap.get(key);
- if (offsetList.size() > 1 || offsetList.size() == 0) {
- throw new IOException("Why root index doen't have one offset?");
- }
- rootOut.writeLong(offsetList.getFirst());
+ /* root index creating phase */
+ if (writeRootIndex) {
+ if (sorted) {
+ //write root index header
+ rootIndexBuffer.writeInt(loadNum);
+ rootIndexBuffer.writeInt(rootEntrySize);
+ // write remaining data to file
+ flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+ } else {
+ TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
+ rootIndexBuffer.clear();
+ /* root key writing */
+ for (Map.Entry<Tuple, LinkedList<Long>> entry : rootMap.entrySet()) {
+ LinkedList<Long> offsetList = entry.getValue();
+ if (offsetList.size() != 1) {
+ throw new IOException("Why root index doen't have one offset? offsets:" + offsetList.size());
+ }
+ writeRootIndex(rootIndexBuffer, entry.getKey(), offsetList.getFirst());
+ }
+
+ //write root index header
+ rootIndexBuffer.writeInt(this.loadNum);
+ rootIndexBuffer.writeInt(rootEntrySize);
+
+ flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+ rootCollector.clear();
+ }
}
- rootOut.flush();
- rootOut.close();
+ } finally {
+ indexBuffer.release();
+ rootIndexBuffer.release();
- keySet.clear();
- rootCollector.clear();
+ FileUtil.cleanupAndthrowIfFailed(outChannel, outRandomAccessFile, out,
+ rootOutChannel, rootOutRandomAccessFile, rootOut);
}
}
@@ -289,7 +450,6 @@ public class BSTIndex implements IndexMethod {
private FileSystem fs;
private FSDataInputStream indexIn;
- private FSDataInputStream subIn;
private int level;
private int entryNum;
@@ -301,6 +461,7 @@ public class BSTIndex implements IndexMethod {
private int rootCursor;
private int keyCursor;
private int offsetCursor;
+ private long dataLength;
// mutex
private final Object mutex = new Object();
@@ -319,10 +480,12 @@ public class BSTIndex implements IndexMethod {
this.keySchema = keySchema;
this.comparator = comparator;
this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
+ open();
}
public BSTIndexReader(final Path fileName) throws IOException {
this.fileName = fileName;
+ open();
}
public Schema getKeySchema() {
@@ -333,11 +496,21 @@ public class BSTIndex implements IndexMethod {
return this.comparator;
}
- private void readHeader() throws IOException {
+ private void loadFooter() throws IOException {
+ long fileLength = fs.getFileStatus(this.fileName).getLen();
+
+ //read footer
+ indexIn.seek(fileLength - 4);
+ int footerSize = indexIn.readInt();
+ dataLength = fileLength - footerSize;
+ ByteBuf byteBuf = Unpooled.buffer(footerSize, footerSize);
+ indexIn.seek(dataLength);
+ byteBuf.writeBytes(indexIn, footerSize);
+
// schema
- int schemaByteSize = indexIn.readInt();
+ int schemaByteSize = byteBuf.readInt();
byte [] schemaBytes = new byte[schemaByteSize];
- StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+ byteBuf.readBytes(schemaBytes);
SchemaProto.Builder builder = SchemaProto.newBuilder();
builder.mergeFrom(schemaBytes);
@@ -346,30 +519,36 @@ public class BSTIndex implements IndexMethod {
this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
// comparator
- int compByteSize = indexIn.readInt();
+ int compByteSize = byteBuf.readInt();
byte [] compBytes = new byte[compByteSize];
- StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
+ byteBuf.readBytes(compBytes);
TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
compProto.mergeFrom(compBytes);
this.comparator = new BaseTupleComparator(compProto.build());
// level
- this.level = indexIn.readInt();
+ this.level = byteBuf.readInt();
// entry
- this.entryNum = indexIn.readInt();
+ this.entryNum = byteBuf.readInt();
if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
- byte [] minBytes = new byte[indexIn.readInt()];
- StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
+ byte [] minBytes = new byte[byteBuf.readInt()];
+ byteBuf.readBytes(minBytes);
this.firstKey = rowStoreDecoder.toTuple(minBytes);
- byte [] maxBytes = new byte[indexIn.readInt()];
- StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
+ byte [] maxBytes = new byte[byteBuf.readInt()];
+ byteBuf.readBytes(maxBytes);
this.lastKey = rowStoreDecoder.toTuple(maxBytes);
}
+ byteBuf.release();
+ }
+
+ public void init() throws IOException {
+ open();
+ fillData();
}
- public void open()
+ private void open()
throws IOException {
/* init the index file */
fs = fileName.getFileSystem(conf);
@@ -378,11 +557,11 @@ public class BSTIndex implements IndexMethod {
}
indexIn = fs.open(this.fileName);
- readHeader();
- fillData();
+ loadFooter();
}
private void fillData() throws IOException {
+ indexIn.seek(0);
/* load on memory */
if (this.level == TWO_LEVEL_INDEX) {
@@ -391,13 +570,16 @@ public class BSTIndex implements IndexMethod {
throw new FileNotFoundException("root index did not created");
}
- subIn = indexIn;
- indexIn = fs.open(rootPath);
+ try (FSDataInputStream rootIndexIn = fs.open(rootPath)) {
+ long fileLength = fs.getFileStatus(rootPath).getLen();
/* root index header reading : type => loadNum => indexSize */
- this.loadNum = indexIn.readInt();
- this.entryNum = indexIn.readInt();
- /**/
- fillRootIndex(entryNum, indexIn);
+ rootIndexIn.seek(fileLength - 8);
+ this.loadNum = rootIndexIn.readInt();
+ this.entryNum = rootIndexIn.readInt();
+
+ rootIndexIn.seek(0);
+ fillRootIndex(entryNum, rootIndexIn);
+ }
} else {
fillLeafIndex(entryNum, indexIn, -1);
@@ -455,7 +637,7 @@ public class BSTIndex implements IndexMethod {
} else {
if (offsetIndex.length -1 > rootCursor) {
rootCursor++;
- fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
+ fillLeafIndex(loadNum + 1, indexIn, this.offsetIndex[rootCursor]);
keyCursor = 1;
offsetCursor = 0;
} else {
@@ -485,6 +667,10 @@ public class BSTIndex implements IndexMethod {
byte[] buf;
for (int i = 0; i < entryNum; i++) {
counter++;
+
+ if (in.getPos() >= dataLength)
+ throw new EOFException("Path:" + fileName + ", Pos: " + in.getPos() + ", Data len:" + dataLength);
+
buf = new byte[in.readInt()];
StorageUtil.readFully(in, buf, 0, buf.length);
dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
@@ -494,10 +680,10 @@ public class BSTIndex implements IndexMethod {
for (int j = 0; j < offsetNum; j++) {
this.offsetSubIndex[i][j] = in.readLong();
}
-
}
} catch (IOException e) {
+ //TODO this block should fix correctly
counter--;
if (pos != -1) {
in.seek(pos);
@@ -567,9 +753,9 @@ public class BSTIndex implements IndexMethod {
} else {
rootCursor = 0;
}
- fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
+ fillLeafIndex(loadNum, indexIn, this.offsetIndex[rootCursor]);
pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-
+
return pos;
}
@@ -618,7 +804,6 @@ public class BSTIndex implements IndexMethod {
@Override
public void close() throws IOException {
this.indexIn.close();
- this.subIn.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 30cea60..a9d8ce2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -122,7 +122,7 @@ public class TestBSTIndex {
BSTIndex.TWO_LEVEL_INDEX,
keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -147,7 +147,7 @@ public class TestBSTIndex {
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
- reader.open();
+ reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -197,7 +197,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
Tuple tuple;
long offset;
@@ -227,7 +227,7 @@ public class TestBSTIndex {
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -290,7 +290,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -315,7 +315,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
keyTuple.put(0, DatumFactory.createInt8(i));
keyTuple.put(1, DatumFactory.createFloat8(i));
@@ -363,7 +363,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -388,7 +388,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -456,7 +456,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -481,7 +481,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -538,7 +538,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -565,7 +565,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -624,7 +624,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -649,7 +649,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
Tuple min = reader.getFirstKey();
assertEquals(5, min.getInt4(0));
@@ -731,7 +731,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -756,7 +756,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
Thread[] threads = new Thread[5];
ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
@@ -812,9 +812,9 @@ public class TestBSTIndex {
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
- BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -841,7 +841,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
@@ -906,7 +906,7 @@ public class TestBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
"testFindNextKeyValueDescOrder_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -932,7 +932,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"),
keySchema, comp);
- reader.open();
+ reader.init();
assertEquals(keySchema, reader.getKeySchema());
assertEquals(comp, reader.getComparator());
@@ -965,4 +965,97 @@ public class TestBSTIndex {
reader.close();
scanner.close();
}
+
+ @Test
+ public void testFindValueASCOrder() throws IOException {
+ meta = CatalogUtil.newTableMeta(dataFormat);
+
+ Path tablePath = new Path(testDir, "testFindValue_" + dataFormat);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(meta, schema, tablePath);
+ appender.init();
+ Tuple tuple;
+
+ // order by asc
+ for (int i = 0; i < TUPLE_NUM; i++) {
+ tuple = new VTuple(5);
+ tuple.put(0, DatumFactory.createInt4(i));
+ tuple.put(1, DatumFactory.createInt8(i));
+ tuple.put(2, DatumFactory.createFloat8(i));
+ tuple.put(3, DatumFactory.createFloat4(i));
+ tuple.put(4, DatumFactory.createText("field_" + i));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+ SortSpec[] sortKeys = new SortSpec[2];
+ sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+ sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+ Schema keySchema = new Schema();
+ keySchema.addColumn(new Column("long", Type.INT8));
+ keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+ BSTIndex bst = new BSTIndex(conf);
+ BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"),
+ BSTIndex.TWO_LEVEL_INDEX,
+ keySchema, comp, true);
+ creater.setLoadNum(LOAD_NUM);
+ creater.init();
+
+ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ scanner.init();
+
+ Tuple keyTuple;
+ long offset;
+ while (true) {
+ keyTuple = new VTuple(2);
+ offset = scanner.getNextOffset();
+ tuple = scanner.next();
+ if (tuple == null) break;
+
+ keyTuple.put(0, tuple.asDatum(1));
+ keyTuple.put(1, tuple.asDatum(2));
+ creater.write(keyTuple, offset);
+ }
+
+ creater.flush();
+ creater.close();
+ scanner.close();
+
+ tuple = new VTuple(keySchema.size());
+ BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
+ reader.init();
+ scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ scanner.init();
+
+ for (int i = 0; i < TUPLE_NUM - 1; i++) {
+ tuple.put(0, DatumFactory.createInt8(i));
+ tuple.put(1, DatumFactory.createFloat8(i));
+ long offsets = reader.find(tuple);
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
+ assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
+
+ offsets = reader.next();
+ if (offsets == -1) {
+ continue;
+ }
+ scanner.seek(offsets);
+ tuple = scanner.next();
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
+ assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
+ }
+ reader.close();
+ scanner.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 8262073..b2ca5b8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -109,7 +109,7 @@ public class TestSingleCSVFileBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
"FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
.getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -135,7 +135,7 @@ public class TestSingleCSVFileBSTIndex {
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
"FindValueInCSV.idx"), keySchema, comp);
- reader.open();
+ reader.init();
fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
.getSeekableScanner(meta, schema, tablet.getProto(), schema);
fileScanner.init();
@@ -200,7 +200,7 @@ public class TestSingleCSVFileBSTIndex {
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
- creater.open();
+ creater.init();
SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
.getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -223,7 +223,7 @@ public class TestSingleCSVFileBSTIndex {
fileScanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
- reader.open();
+ reader.init();
fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
.getSeekableScanner(meta, schema, tablet.getProto(), schema);
fileScanner.init();