You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 15:44:13 UTC
incubator-kylin git commit: KYLIN-786,
MemDiskStore that hold cuboid data in mem as much as possible
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 61779f08c -> 4631e40a7
KYLIN-786, MemDiskStore that hold cuboid data in mem as much as possible
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4631e40a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4631e40a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4631e40a
Branch: refs/heads/0.8.0
Commit: 4631e40a7931d93ec681332d2bfbd9b41f0613f5
Parents: 61779f0
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 20:29:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu May 28 21:41:08 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 142 +++++
.../common/util/MemoryBudgetControllerTest.java | 60 ++
.../kylin/storage/gridtable/GTRowBlock.java | 88 +--
.../gridtable/memstore/GTMemDiskStore.java | 573 +++++++++++++++++++
.../storage/gridtable/DictGridTableTest.java | 2 +-
.../storage/gridtable/GTMemDiskStoreTest.java | 74 +++
.../storage/gridtable/SimpleGridTableTest.java | 80 ++-
7 files changed, 960 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..06d3cd2
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,142 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+ public static interface MemoryConsumer {
+ // return number MB released
+ int freeUp(int mb);
+ }
+
+ @SuppressWarnings("serial")
+ public static class NotEnoughBudgetException extends IllegalStateException {
+ }
+
+ private static class ConsumerEntry {
+ final MemoryConsumer consumer;
+ int reservedMB;
+
+ ConsumerEntry(MemoryConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ public static final int ONE_MB = 1024 * 1024;
+ public static final int SYSTEM_RESERVED = 200;
+
+ private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+ // all budget numbers are in MB
+ private final int totalBudgetMB;
+ private final AtomicInteger totalReservedMB;
+ private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+
+
+ public MemoryBudgetController(int totalBudgetMB) {
+ if (totalBudgetMB < 0)
+ throw new IllegalArgumentException();
+ if (checkSystemAvailMB(totalBudgetMB) == false)
+ throw new IllegalStateException();
+
+ this.totalBudgetMB = totalBudgetMB;
+ this.totalReservedMB = new AtomicInteger();
+ }
+
+ public int getTotalBudgetMB() {
+ return totalBudgetMB;
+ }
+
+ public int getTotalReservedMB() {
+ return totalReservedMB.get();
+ }
+
+ public int getRemainingBudgetMB() {
+ return totalBudgetMB - totalReservedMB.get();
+ }
+
+ public void reserve(MemoryConsumer consumer, int requestMB) {
+ if (totalBudgetMB == 0 && requestMB > 0)
+ throw new NotEnoughBudgetException();
+
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
+ entry = booking.get(consumer);
+ }
+
+ int delta = requestMB - entry.reservedMB;
+
+ if (delta > 0) {
+ checkFreeMemoryAndUpdateBooking(entry, delta);
+ } else {
+ updateBooking(entry, delta);
+ }
+ }
+
+ synchronized private void updateBooking(ConsumerEntry entry, int delta) {
+ totalReservedMB.addAndGet(delta);
+ entry.reservedMB += delta;
+ if (entry.reservedMB == 0) {
+ booking.remove(entry.consumer);
+ }
+ if (delta < 0) {
+ this.notify();
+ }
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ }
+
+ private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+ while (true) {
+ // if budget is not enough, try free up
+ while (delta > totalBudgetMB - totalReservedMB.get()) {
+ int freeUpToGo = delta;
+ for (ConsumerEntry entry : booking.values()) {
+ int mb = entry.consumer.freeUp(freeUpToGo);
+ updateBooking(entry, -mb);
+ freeUpToGo -= mb;
+ if (freeUpToGo <= 0)
+ break;
+ }
+ if (freeUpToGo > 0)
+ throw new NotEnoughBudgetException();
+ }
+
+ if (checkSystemAvailMB(delta))
+ break;
+
+ try {
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ this.wait(200);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while wait free memory", e);
+ }
+ }
+
+ updateBooking(consumer, delta);
+ }
+
+ private boolean checkSystemAvailMB(int mb) {
+ return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+ }
+
+ public static int getSystemAvailMB() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ int availMB = (int) (availableMemory / ONE_MB);
+ return availMB;
+ }
+
+ public static int getMaxPossibleBudget() {
+ return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..bc2eadd
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+ @Test
+ public void test() {
+ int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+ MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+ ArrayList<OneMB> mbList = new ArrayList<OneMB>();
+ for (int i = 0; i < n; i++) {
+ mbList.add(new OneMB(mbc));
+ assertEquals(mbList.size(), mbc.getTotalReservedMB());
+ }
+
+ mbc.reserve(new OneMB(), n);
+
+ for (int i = 0; i < n; i++) {
+ assertEquals(null, mbList.get(i).data);
+ }
+
+ try {
+ mbc.reserve(new OneMB(), 1);
+ fail();
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ class OneMB implements MemoryBudgetController.MemoryConsumer {
+
+ byte[] data;
+
+ OneMB() {
+ }
+
+ OneMB(MemoryBudgetController mbc) {
+ mbc.reserve(this, 1);
+ data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ if (data != null) {
+ data = null;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 4a68659..eed5698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.gridtable;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -40,36 +41,36 @@ public class GTRowBlock {
this.cellBlocks[i] = new ByteArray();
}
}
-
+
public int getSequenceId() {
return seqId;
}
-
+
public ByteArray getPrimaryKey() {
return primaryKey;
}
-
+
public ByteArray getCellBlock(int i) {
return cellBlocks[i];
}
-
+
public Writer getWriter() {
return new Writer();
}
public class Writer {
ByteBuffer[] cellBlockBuffers;
-
+
Writer() {
cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
for (int i = 0; i < cellBlockBuffers.length; i++) {
cellBlockBuffers[i] = cellBlocks[i].asBuffer();
}
}
-
+
public void copyFrom(GTRowBlock other) {
assert info == other.info;
-
+
seqId = other.seqId;
nRows = other.nRows;
primaryKey.copyFrom(other.primaryKey);
@@ -89,7 +90,7 @@ public class GTRowBlock {
}
nRows++;
}
-
+
public void readyForFlush() {
for (int i = 0; i < cellBlocks.length; i++) {
cellBlocks[i].setLength(cellBlockBuffers[i].position());
@@ -104,21 +105,21 @@ public class GTRowBlock {
}
}
}
-
+
public Reader getReader() {
return new Reader(info.colBlocksAll);
}
-
+
public Reader getReader(BitSet selectedColBlocks) {
return new Reader(selectedColBlocks);
}
-
+
public class Reader {
int cur;
ByteBuffer primaryKeyBuffer;
ByteBuffer[] cellBlockBuffers;
BitSet selectedColBlocks;
-
+
Reader(BitSet selectedColBlocks) {
primaryKeyBuffer = primaryKey.asBuffer();
cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
@@ -127,15 +128,15 @@ public class GTRowBlock {
}
this.selectedColBlocks = selectedColBlocks;
}
-
+
public boolean hasNext() {
return cur < nRows;
}
-
+
public void fetchNext(GTRecord result) {
if (hasNext() == false)
throw new IllegalArgumentException();
-
+
for (int c = selectedColBlocks.nextSetBit(0); c >= 0; c = selectedColBlocks.nextSetBit(c + 1)) {
result.loadCellBlock(c, cellBlockBuffers[c]);
}
@@ -153,7 +154,7 @@ public class GTRowBlock {
return copy;
}
-
+
public boolean isEmpty() {
return nRows == 0;
}
@@ -164,7 +165,7 @@ public class GTRowBlock {
else
return nRows > 0;
}
-
+
public int getNumberOfRows() {
return nRows;
}
@@ -172,36 +173,41 @@ public class GTRowBlock {
public void setNumberOfRows(int nRows) {
this.nRows = nRows;
}
-
- // TODO export / load should optimize for disabled row block
+ // ============================================================================
+
public int exportLength() {
- int len = 4 + 4 + (4 + primaryKey.length());
+ int len = 4; // seq Id
+ if (info.isRowBlockEnabled())
+ len += 4; // nRows
+ len += 4 + primaryKey.length(); // PK byte array
for (ByteArray array : cellBlocks) {
- len += 4 + array.length();
+ len += 4 + array.length(); // cell block byte array
}
return len;
}
- public void export(DataOutputStream dataOutputStream) throws IOException {
- dataOutputStream.writeInt(seqId);
- dataOutputStream.writeInt(nRows);
- export(dataOutputStream, primaryKey);
+ /** write data to given output stream, like serialize */
+ public void export(DataOutputStream out) throws IOException {
+ out.writeInt(seqId);
+ if (info.isRowBlockEnabled())
+ out.writeInt(nRows);
+ export(out, primaryKey);
for (ByteArray cb : cellBlocks) {
- export(dataOutputStream, cb);
+ export(out, cb);
}
}
- private void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
- dataOutputStream.writeInt(array.length());
- dataOutputStream.write(array.array(), array.offset(), array.length());
+ private void export(DataOutputStream out, ByteArray array) throws IOException {
+ out.writeInt(array.length());
+ out.write(array.array(), array.offset(), array.length());
}
-
/** write data to given buffer, like serialize */
public void export(ByteBuffer buf) {
buf.putInt(seqId);
- buf.putInt(nRows);
+ if (info.isRowBlockEnabled())
+ buf.putInt(nRows);
export(primaryKey, buf);
for (ByteArray cb : cellBlocks) {
export(cb, buf);
@@ -212,11 +218,29 @@ public class GTRowBlock {
buf.putInt(array.length());
buf.put(array.array(), array.offset(), array.length());
}
+
+ /** read data from given input stream, like deserialize */
+ public void importFrom(DataInputStream in) throws IOException {
+ seqId = in.readInt();
+ nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+ importFrom(in, primaryKey);
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ ByteArray cb = cellBlocks[i];
+ importFrom(in, cb);
+ }
+ }
+
+ private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+ byte[] data = result.array();
+ int len = in.readInt();
+ in.read(data, 0, len);
+ result.set(data, 0, len);
+ }
/** change pointers to point to data in given buffer, UNLIKE deserialize */
public void load(ByteBuffer buf) {
seqId = buf.getInt();
- nRows = buf.getInt();
+ nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
load(primaryKey, buf);
for (int i = 0; i < info.colBlocks.length; i++) {
ByteArray cb = cellBlocks[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
new file mode 100644
index 0000000..4ae4f5c
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -0,0 +1,573 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GTMemDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+ private static final int STREAM_BUFFER_SIZE = 8192;
+ private static final int MEM_CHUNK_SIZE_MB = 1;
+
+ final GTInfo info;
+ final MemPart memPart;
+ final DiskPart diskPart;
+
+ public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+ this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+ }
+
+ public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+ this(info, budgetCtrl, diskFile, false);
+ }
+
+ private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
+ this.info = info;
+ this.memPart = new MemPart(budgetCtrl);
+ this.diskPart = new DiskPart(diskFile);
+
+ if (delOnExit)
+ diskFile.deleteOnExit();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return new Writer(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return new Writer(diskPart.tailOffset);
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return new Reader();
+ }
+
+ @Override
+ public void close() throws IOException {
+ memPart.close();
+ diskPart.close();
+ }
+
+ @Override
+ public String toString() {
+ return "MemDiskStore@" + this.hashCode();
+ }
+
+ private class Reader implements IGTStoreScanner {
+
+ final DataInputStream din;
+ long diskOffset = 0;
+ long memRead = 0;
+ long diskRead = 0;
+ int nReadCalls = 0;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader() throws IOException {
+ diskPart.openRead();
+ logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+ MemChunk memChunk;
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ nReadCalls++;
+ if (available() <= 0)
+ return -1;
+
+ if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
+ memChunk = memPart.seekMemChunk(diskOffset);
+ }
+
+ int lenToGo = Math.min(available(), len);
+
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n;
+ if (memChunk != null) {
+ if (memChunk.headOffset() > diskOffset) {
+ memChunk = null;
+ continue;
+ }
+ if (diskOffset >= memChunk.tailOffset()) {
+ memChunk = memChunk.next;
+ continue;
+ }
+ int chunkOffset = (int) (diskOffset - memChunk.headOffset());
+ n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
+ System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+ memRead += n;
+ } else {
+ n = diskPart.read(diskOffset, b, off, lenToGo);
+ diskRead += n;
+ }
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ diskOffset += n;
+ }
+ return nRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (diskPart.tailOffset - diskOffset);
+ }
+ };
+
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ diskPart.closeRead();
+ logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ }
+
+ }
+
+ private class Writer implements IGTStoreWriter {
+
+ final DataOutputStream dout;
+ long diskOffset;
+ long memWrite = 0;
+ long diskWrite = 0;
+ int nWriteCalls;
+
+ Writer(long startOffset) throws IOException {
+ diskOffset = 0; // TODO does not support append yet
+ memPart.clear();
+ diskPart.clear();
+ diskPart.openWrite(false);
+ logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+
+ memPart.activateMemWrite();
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+ boolean memPartActivated = true;
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ nWriteCalls++;
+ while (length > 0) {
+ int n;
+ if (memPartActivated) {
+ n = memPart.write(bytes, offset, length, diskOffset);
+ memWrite += n;
+ if (n == 0) {
+ memPartActivated = false;
+ }
+ } else {
+ n = diskPart.write(diskOffset, bytes, offset, length);
+ diskWrite += n;
+ }
+ offset += n;
+ length -= n;
+ diskOffset += n;
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dout.close();
+ memPart.finishAsyncFlush();
+ diskPart.closeWrite();
+ assert diskOffset == diskPart.tailOffset;
+ logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ }
+ }
+
+ private static class MemChunk {
+ long diskOffset;
+ int length;
+ byte[] data;
+ MemChunk next;
+
+ boolean isFull() {
+ return length == data.length;
+ }
+
+ long headOffset() {
+ return diskOffset;
+ }
+
+ long tailOffset() {
+ return diskOffset + length;
+ }
+
+ int freeSpace() {
+ return data.length - length;
+ }
+ }
+
+ private class MemPart implements Closeable, MemoryConsumer {
+
+ final MemoryBudgetController budgetCtrl;
+
+ // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+ volatile boolean writeActivated;
+ MemChunk firstChunk;
+ MemChunk lastChunk;
+ int chunkCount;
+
+ Thread asyncFlusher;
+ MemChunk asyncFlushChunk;
+ long asyncFlushDiskOffset;
+ Throwable asyncFlushException;
+
+ MemPart(MemoryBudgetController budgetCtrl) {
+ this.budgetCtrl = budgetCtrl;
+ }
+
+ long headOffset() {
+ return firstChunk == null ? 0 : firstChunk.headOffset();
+ }
+
+ long tailOffset() {
+ return lastChunk == null ? 0 : lastChunk.tailOffset();
+ }
+
+ synchronized public MemChunk seekMemChunk(long diskOffset) {
+ MemChunk c = firstChunk;
+ while (c != null && c.headOffset() <= diskOffset) {
+ if (diskOffset < c.tailOffset())
+ break;
+ c = c.next;
+ }
+ return c;
+ }
+
+ public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ int needMoreMem = 0;
+
+ synchronized (this) {
+ if (writeActivated == false)
+ return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull())
+ needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+ }
+
+ // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+ if (needMoreMem > 0) {
+ try {
+ budgetCtrl.reserve(this, needMoreMem);
+ } catch (NotEnoughBudgetException ex) {
+ deactivateMemWrite();
+ return 0;
+ }
+ }
+
+ synchronized (this) {
+ if (needMoreMem > 0) {
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
+
+ if (n > 0)
+ asyncFlush(lastChunk, diskOffset, n);
+
+ return n;
+ }
+ }
+
+ private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+ if (asyncFlushChunk == null) {
+ asyncFlushChunk = lastChunk;
+ asyncFlushDiskOffset = diskOffset;
+ }
+
+ if (asyncFlusher == null) {
+ asyncFlusher = new Thread() {
+ public void run() {
+ asyncFlushException = null;
+ logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+ try {
+ while (writeActivated) {
+ flushToDisk();
+ Thread.sleep(10);
+ }
+ flushToDisk();
+ } catch (Throwable ex) {
+ asyncFlushException = ex;
+ }
+ logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+ }
+ };
+ asyncFlusher.start();
+ }
+ }
+
+ private void flushToDisk() throws IOException {
+ byte[] data;
+ int offset = 0;
+ int length = 0;
+ int flushedLen = 0;
+
+ while (true) {
+ data = null;
+ synchronized (memPart) {
+ asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+ // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+ if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+ asyncFlushChunk = asyncFlushChunk.next;
+ }
+ if (asyncFlushChunk != null) {
+ data = asyncFlushChunk.data;
+ offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+ length = asyncFlushChunk.length - offset;
+ }
+ }
+
+ if (data == null)
+ break;
+
+ flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+ }
+ }
+
+ public void finishAsyncFlush() throws IOException {
+ deactivateMemWrite();
+ if (asyncFlusher != null) {
+ try {
+ asyncFlusher.join();
+ } catch (InterruptedException e) {
+ logger.warn("", e);
+ }
+ asyncFlusher = null;
+ asyncFlushChunk = null;
+
+ if (asyncFlushException != null) {
+ if (asyncFlushException instanceof IOException)
+ throw (IOException) asyncFlushException;
+ else
+ throw new IOException(asyncFlushException);
+ }
+ }
+ }
+
+ @Override
+ synchronized public int freeUp(int mb) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased += MEM_CHUNK_SIZE_MB;
+ chunkCount--;
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = null;
+ } else {
+ MemChunk next = firstChunk.next;
+ firstChunk.next = null;
+ firstChunk = next;
+ }
+ }
+ return mbReleased;
+ }
+
+ public void activateMemWrite() {
+ writeActivated = true;
+ logger.debug(GTMemDiskStore.this + " mem write activated");
+ }
+
+ public void deactivateMemWrite() {
+ writeActivated = false;
+ logger.debug(GTMemDiskStore.this + " mem write de-activated");
+ }
+
+ synchronized public void clear() {
+ budgetCtrl.reserve(this, 0);
+ chunkCount = 0;
+ firstChunk = lastChunk = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ finishAsyncFlush();
+ clear();
+ }
+
+ @Override
+ public String toString() {
+ return GTMemDiskStore.this.toString();
+ }
+
+ }
+
+ private class DiskPart implements Closeable {
+ final File diskFile;
+ FileChannel writeChannel;
+ FileChannel readChannel;
+ long tailOffset;
+
+ DiskPart(File diskFile) throws IOException {
+ this.diskFile = diskFile;
+ this.tailOffset = diskFile.length();
+ logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ public void openRead() throws IOException {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ tailOffset = diskFile.length();
+ }
+
+ public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ }
+
+ public void closeRead() throws IOException {
+ if (readChannel != null) {
+ readChannel.close();
+ readChannel = null;
+ }
+ }
+
+ public void openWrite(boolean append) throws IOException {
+ if (append) {
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ tailOffset = diskFile.length();
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ tailOffset = 0;
+ }
+ }
+
+ public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + n, tailOffset);
+ return n;
+ }
+
+ public void closeWrite() throws IOException {
+ if (writeChannel != null) {
+ writeChannel.close();
+ writeChannel = null;
+ }
+ }
+
+ public void clear() throws IOException {
+ diskFile.delete();
+ tailOffset = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeWrite();
+ closeRead();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index af47b89..374f497 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -302,7 +302,7 @@ public class DictGridTableTest {
builder.setColumns( //
DataType.getInstance("timestamp"), //
DataType.getInstance("integer"), //
- DataType.getInstance("varchar"), //
+ DataType.getInstance("varchar(10)"), //
DataType.getInstance("bigint"), //
DataType.getInstance("decimal") //
);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..b7678b8
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.storage.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.junit.Test;
+
+public class GTMemDiskStoreTest {
+
+ final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+ final GTInfo info = SimpleGridTableTest.advancedInfo();
+ final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadWriteRead() throws IOException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead();
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+
+ int nThreads = 5;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ verifyOneTableWriteAndRead();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead() throws IOException {
+ GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table);
+ }
+
+ private void verifyWriteAndRead(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ IGTScanner scanner = table.scan(new GTScanRequest(info));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7639f12..05e61f8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -4,9 +4,12 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.BitSet;
+import java.util.List;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.storage.gridtable.GTInfo.Builder;
import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
@@ -115,56 +118,81 @@ public class SimpleGridTableTest {
}
static GTBuilder rebuild(GridTable table) throws IOException {
- GTRecord r = new GTRecord(table.getInfo());
GTBuilder builder = table.rebuild();
-
- builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+ builder.write(rec);
+ }
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
return builder;
}
+
+ static List<GTRecord> mockupData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ }
+ return result;
+ }
+
+ private static String datePlus(String date, int plusDays) {
+ long millis = DateFormat.stringToMillis(date);
+ millis += (1000L * 3600L * 24L) * plusDays;
+ return DateFormat.formatToDateStr(millis);
+ }
+
+ private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+ GTRecord rec = new GTRecord(info);
+ return rec.setValues(date, name, category, amount, price);
+ }
static void rebuildViaAppend(GridTable table) throws IOException {
- GTRecord r = new GTRecord(table.getInfo());
+ List<GTRecord> data = mockupData(table.getInfo(), 10);
GTBuilder builder;
+ int i = 0;
builder = table.append();
- builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
@@ -188,9 +216,9 @@ public class SimpleGridTableTest {
Builder builder = GTInfo.builder();
builder.setCodeSystem(new GTSampleCodeSystem());
builder.setColumns( //
- DataType.getInstance("varchar"), //
- DataType.getInstance("varchar"), //
- DataType.getInstance("varchar"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
DataType.getInstance("bigint"), //
DataType.getInstance("decimal") //
);