You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 02:46:35 UTC
incubator-kylin git commit: MemDiskStore coding done, pending test
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-786 7945a1e49 -> 294cb8097
MemDiskStore coding done, pending test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/294cb809
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/294cb809
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/294cb809
Branch: refs/heads/KYLIN-786
Commit: 294cb809732d407a3b5c382a64646cf527f7c62d
Parents: 7945a1e
Author: Yang Li <li...@apache.org>
Authored: Thu May 28 08:46:11 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu May 28 08:46:11 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 7 +-
.../kylin/storage/gridtable/GTRowBlock.java | 88 ++--
.../gridtable/memstore/GTMemDiskStore.java | 509 +++++++++++++++++++
.../gridtable/memstore/MemDiskStore.java | 357 -------------
.../storage/gridtable/DictGridTableTest.java | 2 +-
.../storage/gridtable/GTMemDiskStoreTest.java | 5 +
.../storage/gridtable/SimpleGridTableTest.java | 6 +-
7 files changed, 580 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index dab5607..36a0998 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -52,6 +52,10 @@ public class MemoryBudgetController {
public int getTotalReservedMB() {
return totalReservedMB;
}
+
+ public int getRemainingBudgetMB() {
+ return totalBudgetMB - totalReservedMB;
+ }
public void reserve(MemoryConsumer consumer, int requestMB) {
ConsumerEntry entry = booking.get(consumer);
@@ -78,6 +82,7 @@ public class MemoryBudgetController {
if (delta < 0) {
this.notify();
}
+ logger.debug(entry + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
}
synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
@@ -104,7 +109,7 @@ public class MemoryBudgetController {
break;
try {
- logger.debug("Memory budget has " + (totalBudgetMB - totalReservedMB) + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
this.wait(200);
} catch (InterruptedException e) {
logger.error("Interrupted while wait free memory", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 4a68659..eed5698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.gridtable;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -40,36 +41,36 @@ public class GTRowBlock {
this.cellBlocks[i] = new ByteArray();
}
}
-
+
public int getSequenceId() {
return seqId;
}
-
+
public ByteArray getPrimaryKey() {
return primaryKey;
}
-
+
public ByteArray getCellBlock(int i) {
return cellBlocks[i];
}
-
+
public Writer getWriter() {
return new Writer();
}
public class Writer {
ByteBuffer[] cellBlockBuffers;
-
+
Writer() {
cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
for (int i = 0; i < cellBlockBuffers.length; i++) {
cellBlockBuffers[i] = cellBlocks[i].asBuffer();
}
}
-
+
public void copyFrom(GTRowBlock other) {
assert info == other.info;
-
+
seqId = other.seqId;
nRows = other.nRows;
primaryKey.copyFrom(other.primaryKey);
@@ -89,7 +90,7 @@ public class GTRowBlock {
}
nRows++;
}
-
+
public void readyForFlush() {
for (int i = 0; i < cellBlocks.length; i++) {
cellBlocks[i].setLength(cellBlockBuffers[i].position());
@@ -104,21 +105,21 @@ public class GTRowBlock {
}
}
}
-
+
public Reader getReader() {
return new Reader(info.colBlocksAll);
}
-
+
public Reader getReader(BitSet selectedColBlocks) {
return new Reader(selectedColBlocks);
}
-
+
public class Reader {
int cur;
ByteBuffer primaryKeyBuffer;
ByteBuffer[] cellBlockBuffers;
BitSet selectedColBlocks;
-
+
Reader(BitSet selectedColBlocks) {
primaryKeyBuffer = primaryKey.asBuffer();
cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
@@ -127,15 +128,15 @@ public class GTRowBlock {
}
this.selectedColBlocks = selectedColBlocks;
}
-
+
public boolean hasNext() {
return cur < nRows;
}
-
+
public void fetchNext(GTRecord result) {
if (hasNext() == false)
throw new IllegalArgumentException();
-
+
for (int c = selectedColBlocks.nextSetBit(0); c >= 0; c = selectedColBlocks.nextSetBit(c + 1)) {
result.loadCellBlock(c, cellBlockBuffers[c]);
}
@@ -153,7 +154,7 @@ public class GTRowBlock {
return copy;
}
-
+
public boolean isEmpty() {
return nRows == 0;
}
@@ -164,7 +165,7 @@ public class GTRowBlock {
else
return nRows > 0;
}
-
+
public int getNumberOfRows() {
return nRows;
}
@@ -172,36 +173,41 @@ public class GTRowBlock {
public void setNumberOfRows(int nRows) {
this.nRows = nRows;
}
-
- // TODO export / load should optimize for disabled row block
+ // ============================================================================
+
public int exportLength() {
- int len = 4 + 4 + (4 + primaryKey.length());
+ int len = 4; // seq Id
+ if (info.isRowBlockEnabled())
+ len += 4; // nRows
+ len += 4 + primaryKey.length(); // PK byte array
for (ByteArray array : cellBlocks) {
- len += 4 + array.length();
+ len += 4 + array.length(); // cell block byte array
}
return len;
}
- public void export(DataOutputStream dataOutputStream) throws IOException {
- dataOutputStream.writeInt(seqId);
- dataOutputStream.writeInt(nRows);
- export(dataOutputStream, primaryKey);
+ /** write data to given output stream, like serialize */
+ public void export(DataOutputStream out) throws IOException {
+ out.writeInt(seqId);
+ if (info.isRowBlockEnabled())
+ out.writeInt(nRows);
+ export(out, primaryKey);
for (ByteArray cb : cellBlocks) {
- export(dataOutputStream, cb);
+ export(out, cb);
}
}
- private void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
- dataOutputStream.writeInt(array.length());
- dataOutputStream.write(array.array(), array.offset(), array.length());
+ private void export(DataOutputStream out, ByteArray array) throws IOException {
+ out.writeInt(array.length());
+ out.write(array.array(), array.offset(), array.length());
}
-
/** write data to given buffer, like serialize */
public void export(ByteBuffer buf) {
buf.putInt(seqId);
- buf.putInt(nRows);
+ if (info.isRowBlockEnabled())
+ buf.putInt(nRows);
export(primaryKey, buf);
for (ByteArray cb : cellBlocks) {
export(cb, buf);
@@ -212,11 +218,29 @@ public class GTRowBlock {
buf.putInt(array.length());
buf.put(array.array(), array.offset(), array.length());
}
+
+ /** read data from given input stream, like deserialize */
+ public void importFrom(DataInputStream in) throws IOException {
+ seqId = in.readInt();
+ nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+ importFrom(in, primaryKey);
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ ByteArray cb = cellBlocks[i];
+ importFrom(in, cb);
+ }
+ }
+
+ private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+ byte[] data = result.array();
+ int len = in.readInt();
+ in.read(data, 0, len);
+ result.set(data, 0, len);
+ }
/** change pointers to point to data in given buffer, UNLIKE deserialize */
public void load(ByteBuffer buf) {
seqId = buf.getInt();
- nRows = buf.getInt();
+ nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
load(primaryKey, buf);
for (int i = 0; i < info.colBlocks.length; i++) {
ByteArray cb = cellBlocks[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
new file mode 100644
index 0000000..dfa9cd8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -0,0 +1,509 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GTMemDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+ private static final int STREAM_BUFFER_SIZE = 8192;
+ private static final int MEM_CHUNK_SIZE_MB = 1;
+
+ final GTInfo info;
+ final MemPart memPart;
+ final DiskPart diskPart;
+
+ public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+ this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+ }
+
+ public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+ this(info, budgetCtrl, diskFile, false);
+ }
+
+ public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+ this.info = info;
+ this.memPart = new MemPart(budgetCtrl);
+ this.diskPart = new DiskPart(diskFile, delOnClose);
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return new Writer(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return new Writer(diskPart.tailOffset);
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return new Reader();
+ }
+
+ @Override
+ public void close() throws IOException {
+ memPart.close();
+ diskPart.close();
+ }
+
+ private class Reader implements IGTStoreScanner {
+
+ final DataInputStream din;
+ long diskOffset = 0;
+ long memRead = 0;
+ long diskRead = 0;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader() {
+ logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+ MemChunk memChunk;
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (available() <= 0)
+ return -1;
+
+ if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
+ memChunk = memPart.seekMemChunk(diskOffset);
+ }
+
+ int lenToGo = Math.min(available(), len);
+
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n;
+ if (memChunk != null) {
+ if (memChunk.headOffset() > diskOffset) {
+ memChunk = null;
+ continue;
+ }
+ if (diskOffset >= memChunk.tailOffset()) {
+ memChunk = memChunk.next;
+ continue;
+ }
+ int chunkOffset = (int) (diskOffset - memChunk.headOffset());
+ n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
+ System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+ memRead += n;
+ } else {
+ n = diskPart.read(diskOffset, b, off, lenToGo);
+ diskRead += n;
+ }
+ lenToGo -= n;
+ nRead += n;
+ diskOffset += n;
+ }
+ return nRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (diskPart.tailOffset - diskOffset);
+ }
+ };
+
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk");
+ }
+
+ }
+
+ private class Writer implements IGTStoreWriter {
+
+ final DataOutputStream dout;
+ long diskOffset;
+
+ Writer(long startOffset) throws IOException {
+ diskOffset = startOffset;
+ memPart.clear();
+ diskPart.clear();
+ logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+
+ memPart.activateMemWrite();
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+ boolean memPartActivated = true;
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ while (length > 0) {
+ if (memPartActivated) {
+ int n = memPart.write(bytes, offset, length, diskOffset);
+ offset += n;
+ length -= n;
+ diskOffset += n;
+ if (n == 0) {
+ memPartActivated = false;
+ }
+ } else {
+ diskPart.write(diskOffset, bytes, offset, length);
+ diskOffset += length;
+ }
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dout.close();
+ memPart.finishAsyncFlush();
+ assert diskOffset == diskPart.tailOffset;
+ logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset);
+ }
+ }
+
+ private static class MemChunk {
+ long diskOffset;
+ int length;
+ byte[] data;
+ MemChunk next;
+
+ boolean isFull() {
+ return length == data.length;
+ }
+
+ long headOffset() {
+ return diskOffset;
+ }
+
+ long tailOffset() {
+ return diskOffset + length;
+ }
+
+ int freeSpace() {
+ return data.length - length;
+ }
+ }
+
+ private class MemPart implements Closeable, MemoryConsumer {
+
+ final MemoryBudgetController budgetCtrl;
+
+ // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+ volatile boolean writeActivated;
+ MemChunk firstChunk;
+ MemChunk lastChunk;
+ int chunkCount;
+
+ Thread asyncFlusher;
+ MemChunk asyncFlushChunk;
+ long asyncFlushDiskOffset;
+ Throwable asyncFlushException;
+
+ MemPart(MemoryBudgetController budgetCtrl) {
+ this.budgetCtrl = budgetCtrl;
+ }
+
+ long headOffset() {
+ return firstChunk == null ? 0 : firstChunk.headOffset();
+ }
+
+ long tailOffset() {
+ return lastChunk == null ? 0 : lastChunk.tailOffset();
+ }
+
+ synchronized public MemChunk seekMemChunk(long diskOffset) {
+ MemChunk c = firstChunk;
+ while (c != null && c.headOffset() <= diskOffset) {
+ if (c.tailOffset() < diskOffset)
+ break;
+ }
+ return c;
+ }
+
+ synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ if (writeActivated == false)
+ return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull()) {
+ allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+ if (writeActivated == false)
+ return 0;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
+
+ asyncFlush(lastChunk, diskOffset, n);
+
+ return n;
+ }
+
+ private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+ if (asyncFlushChunk == null) {
+ asyncFlushChunk = lastChunk;
+ asyncFlushDiskOffset = diskOffset;
+ }
+
+ if (asyncFlusher == null) {
+ asyncFlusher = new Thread() {
+ public void run() {
+ asyncFlushException = null;
+ logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+ try {
+ while (writeActivated) {
+ flushToDisk();
+ Thread.sleep(10);
+ }
+ flushToDisk();
+ } catch (Throwable ex) {
+ asyncFlushException = ex;
+ }
+ logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+ }
+ };
+ asyncFlusher.start();
+ }
+ }
+
+ private void flushToDisk() throws IOException {
+ byte[] data;
+ int offset = 0;
+ int length = 0;
+
+ while (true) {
+ data = null;
+ synchronized (memPart) {
+ asyncFlushDiskOffset += length; // bytes written in last loop
+ if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+ asyncFlushChunk = asyncFlushChunk.next;
+ }
+ if (asyncFlushChunk != null) {
+ data = asyncFlushChunk.data;
+ offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+ length = asyncFlushChunk.length - offset;
+ }
+ }
+
+ if (data == null)
+ break;
+
+ diskPart.write(asyncFlushDiskOffset, data, offset, length);
+ }
+ }
+
+ private void allocateNewMemChunk(long diskOffset) {
+ try {
+ budgetCtrl.reserve(this, (chunkCount + 1) * MEM_CHUNK_SIZE_MB);
+ } catch (NotEnoughBudgetException ex) {
+ deactivateMemWrite();
+ return;
+ }
+
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+ }
+
+ synchronized public void finishAsyncFlush() throws IOException {
+ deactivateMemWrite();
+ if (asyncFlusher != null) {
+ try {
+ asyncFlusher.join();
+ } catch (InterruptedException e) {
+ logger.warn("", e);
+ }
+ asyncFlusher = null;
+ asyncFlushChunk = null;
+
+ if (asyncFlushException != null) {
+ if (asyncFlushException instanceof IOException)
+ throw (IOException) asyncFlushException;
+ else
+ throw new IOException(asyncFlushException);
+ }
+ }
+ }
+
+ @Override
+ synchronized public int freeUp(int mb) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased += MEM_CHUNK_SIZE_MB;
+ chunkCount--;
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = null;
+ } else {
+ MemChunk next = firstChunk.next;
+ firstChunk.next = null;
+ firstChunk = next;
+ }
+ }
+ return mbReleased;
+ }
+
+ public void activateMemWrite() {
+ writeActivated = true;
+ logger.debug(this + " mem write activated");
+ }
+
+ public void deactivateMemWrite() {
+ writeActivated = false;
+ logger.debug(this + " mem write de-activated");
+ }
+
+ synchronized public void clear() {
+ budgetCtrl.reserve(this, 0);
+ chunkCount = 0;
+ firstChunk = lastChunk = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ finishAsyncFlush();
+ clear();
+ }
+
+ }
+
+ private class DiskPart implements Closeable {
+ final FileChannel channel;
+ long tailOffset;
+
+ DiskPart(File diskFile, boolean delOnClose) throws IOException {
+ HashSet<StandardOpenOption> opts = new HashSet<StandardOpenOption>(10);
+ opts.add(StandardOpenOption.CREATE);
+ opts.add(StandardOpenOption.APPEND);
+ opts.add(StandardOpenOption.WRITE);
+ opts.add(StandardOpenOption.READ);
+ if (delOnClose)
+ opts.add(StandardOpenOption.DELETE_ON_CLOSE);
+ this.channel = FileChannel.open(diskFile.toPath(), opts);
+ }
+
+ public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ return channel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ }
+
+ void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + length, tailOffset);
+ }
+
+ void clear() throws IOException {
+ tailOffset = 0;
+ channel.truncate(0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
deleted file mode 100644
index b5a58ca..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
+++ /dev/null
@@ -1,357 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import static org.apache.kylin.common.util.MemoryBudgetController.*;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.BitSet;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemDiskStore implements IGTStore {
-
- private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
-
- final GTInfo info;
- final MemPart memPart;
- final DiskPart diskPart;
-
- public MemDiskStore(GTInfo info, File diskFile, MemoryBudgetController budgetCtrl) throws IOException {
- this.info = info;
- this.memPart = new MemPart(budgetCtrl);
- this.diskPart = new DiskPart(diskFile);
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public IGTStoreWriter rebuild(int shard) throws IOException {
- return new Writer(0);
- }
-
- @Override
- public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
- return new Writer(Math.max(memPart.tailOffset(), diskPart.tailOffset));
- }
-
- @Override
- public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- private class Writer implements IGTStoreWriter {
-
- final DataOutputStream dout;
- boolean memPartActivated = true;
-
- Writer(final long disktOffset) throws IOException {
- memPart.clear();
- diskPart.clear();
-
- memPart.activateWrite();
-
- OutputStream out = new OutputStream() {
- long diskOffset = disktOffset;
- byte[] tmp = new byte[1];
-
- @Override
- public void write(int b) throws IOException {
- tmp[0] = (byte) b;
- write(tmp, 0, 1);
- }
-
- @Override
- public void write(byte[] bytes, int offset, int length) throws IOException {
- while (length > 0) {
- if (memPartActivated) {
- int n = memPart.write(bytes, offset, length, diskOffset);
- offset += n;
- length -= n;
- diskOffset += n;
- if (n == 0) {
- memPartActivated = false;
- }
- } else {
- diskPart.write(diskOffset, bytes, offset, length);
- diskOffset += length;
- }
- }
- }
- };
- dout = new DataOutputStream(out);
- }
-
- @Override
- public void write(GTRowBlock block) throws IOException {
- block.export(dout);
- }
-
- @Override
- public void close() throws IOException {
- memPart.finishAsyncFlush();
- }
- }
-
- private static class MemChunk {
- long diskOffset;
- int length;
- byte[] data;
- MemChunk next;
-
- boolean isFull() {
- return length == data.length;
- }
-
- long headOffset() {
- return diskOffset;
- }
-
- long tailOffset() {
- return diskOffset + length;
- }
-
- int freeSpace() {
- return data.length - length;
- }
- }
-
- private class MemPart implements Closeable, MemoryConsumer {
-
- final MemoryBudgetController budgetCtrl;
-
- // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
- volatile boolean writeActivated;
- MemChunk firstChunk;
- MemChunk lastChunk;
- int chunkCount;
-
- Thread asyncFlusher;
- MemChunk asyncFlushChunk;
- long asyncFlushDiskOffset;
- Throwable asyncFlushException;
-
- MemPart(MemoryBudgetController budgetCtrl) {
- this.budgetCtrl = budgetCtrl;
- }
-
- @SuppressWarnings("unused")
- long headOffset() {
- return firstChunk == null ? 0 : firstChunk.headOffset();
- }
-
- long tailOffset() {
- return lastChunk == null ? 0 : lastChunk.tailOffset();
- }
-
- synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
- if (writeActivated == false)
- return 0;
-
- // write is only expected at the tail
- if (diskOffset != tailOffset())
- return 0;
-
- if (chunkCount == 0 || lastChunk.isFull()) {
- allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
- if (writeActivated == false)
- return 0;
- }
-
- int n = Math.min(lastChunk.freeSpace(), length);
- System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
- lastChunk.length += n;
-
- asyncFlush(lastChunk, diskOffset, n);
-
- return n;
- }
-
- private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
- if (asyncFlusher == null) {
- asyncFlusher = new Thread() {
- public void run() {
- try {
- while (writeActivated) {
- synchronized (asyncFlusher) {
- try {
- asyncFlusher.wait();
- } catch (InterruptedException e) {
- }
- }
- flushToDiskPart();
- }
- flushToDiskPart();
- } catch (Throwable ex) {
- asyncFlushException = ex;
- }
- }
- };
- asyncFlusher.start();
- }
-
- if (asyncFlushChunk == null) {
- asyncFlushChunk = lastChunk;
- asyncFlushDiskOffset = diskOffset;
- }
-
- // flush in batch
- if (diskOffset + n >= asyncFlushDiskOffset + 4096) {
- synchronized (asyncFlusher) {
- asyncFlusher.notify();
- }
- }
- }
-
- private void flushToDiskPart() throws IOException {
- byte[] data;
- int offset = 0;
- int length = 0;
-
- while (true) {
- data = null;
- synchronized (memPart) {
- asyncFlushDiskOffset += length; // bytes written in last loop
- if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
- asyncFlushChunk = asyncFlushChunk.next;
- }
- if (asyncFlushChunk != null) {
- data = asyncFlushChunk.data;
- offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
- length = asyncFlushChunk.length - offset;
- }
- }
-
- if (data == null)
- break;
-
- diskPart.write(asyncFlushDiskOffset, data, offset, length);
- }
- }
-
- private void allocateNewMemChunk(long diskOffset) {
- try {
- budgetCtrl.reserve(this, chunkCount + 1);
-
- MemChunk chunk = new MemChunk();
- chunk.diskOffset = diskOffset;
- chunk.data = new byte[ONE_MB - 48]; // -48 for MemChunk overhead
- if (chunkCount == 0) {
- firstChunk = lastChunk = chunk;
- } else {
- lastChunk.next = chunk;
- lastChunk = chunk;
- }
- chunkCount++;
-
- } catch (NotEnoughBudgetException ex) {
- deactivateWrite();
- }
- }
-
- synchronized public void finishAsyncFlush() throws IOException {
- deactivateWrite();
- if (asyncFlusher != null) {
- synchronized (asyncFlusher) {
- asyncFlusher.notify();
- }
-
- try {
- asyncFlusher.join();
- } catch (InterruptedException e) {
- logger.warn("", e);
- }
- asyncFlusher = null;
-
- if (asyncFlushException != null) {
- if (asyncFlushException instanceof IOException)
- throw (IOException) asyncFlushException;
- else
- throw new IOException(asyncFlushException);
- }
- }
- }
-
- @Override
- synchronized public int freeUp(int mb) {
- int mbReleased = 0;
- while (chunkCount > 0 && mbReleased < mb) {
- if (firstChunk == asyncFlushChunk)
- break;
-
- mbReleased++;
- chunkCount--;
- if (chunkCount == 0)
- firstChunk = lastChunk = null;
- else
- firstChunk = firstChunk.next;
- }
- return mbReleased;
- }
-
- void activateWrite() {
- writeActivated = true;
- }
-
- void deactivateWrite() {
- writeActivated = false;
- }
-
- synchronized public void clear() {
- chunkCount = 0;
- firstChunk = lastChunk = null;
- }
-
- @Override
- public void close() throws IOException {
- finishAsyncFlush();
- clear();
- }
-
- }
-
- private class DiskPart implements Closeable {
- final File diskFile;
- final FileChannel channel;
- long tailOffset;
-
- DiskPart(File diskFile) throws IOException {
- this.diskFile = diskFile;
- this.channel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE, StandardOpenOption.READ);
- }
-
- void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
- tailOffset += length;
- }
-
- void clear() throws IOException {
- tailOffset = 0;
- channel.truncate(0);
- }
-
- @Override
- public void close() throws IOException {
- channel.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index 3607175..2851f6d 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -302,7 +302,7 @@ public class DictGridTableTest {
builder.setColumns( //
DataType.getInstance("timestamp"), //
DataType.getInstance("integer"), //
- DataType.getInstance("varchar"), //
+ DataType.getInstance("varchar(10)"), //
DataType.getInstance("bigint"), //
DataType.getInstance("decimal") //
);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..dd9668e
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -0,0 +1,5 @@
+package org.apache.kylin.storage.gridtable;
+
+public class GTMemDiskStoreTest {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7639f12..7b676f2 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -188,9 +188,9 @@ public class SimpleGridTableTest {
Builder builder = GTInfo.builder();
builder.setCodeSystem(new GTSampleCodeSystem());
builder.setColumns( //
- DataType.getInstance("varchar"), //
- DataType.getInstance("varchar"), //
- DataType.getInstance("varchar"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
DataType.getInstance("bigint"), //
DataType.getInstance("decimal") //
);