You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/27 14:29:36 UTC
incubator-kylin git commit: MemDiskStore write function done
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-786 [created] 7945a1e49
MemDiskStore write function done
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7945a1e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7945a1e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7945a1e4
Branch: refs/heads/KYLIN-786
Commit: 7945a1e494dd899ab621f4e8082cf47ff7de4456
Parents: 9efb473
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 20:29:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed May 27 20:29:02 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 136 +++++++
.../common/util/MemoryBudgetControllerTest.java | 60 ++++
.../gridtable/memstore/MemDiskStore.java | 357 +++++++++++++++++++
3 files changed, 553 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..dab5607
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,136 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+ public static interface MemoryConsumer {
+ // return number MB released
+ int freeUp(int mb);
+ }
+
+ @SuppressWarnings("serial")
+ public static class NotEnoughBudgetException extends IllegalStateException {
+ }
+
+ private static class ConsumerEntry {
+ final MemoryConsumer consumer;
+ int reservedMB;
+
+ ConsumerEntry(MemoryConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ public static final int ONE_MB = 1024 * 1024;
+ public static final int SYSTEM_RESERVED = 200;
+
+ private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+ // all budget numbers are in MB
+ private final int totalBudgetMB;
+ private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+
+ private int totalReservedMB;
+
+ public MemoryBudgetController(int totalBudgetMB) {
+ if (totalBudgetMB <= 0)
+ throw new IllegalArgumentException();
+ if (checkSystemAvailMB(totalBudgetMB) == false)
+ throw new IllegalStateException();
+
+ this.totalBudgetMB = totalBudgetMB;
+ }
+
+ public int getTotalBudgetMB() {
+ return totalBudgetMB;
+ }
+
+ public int getTotalReservedMB() {
+ return totalReservedMB;
+ }
+
+ public void reserve(MemoryConsumer consumer, int requestMB) {
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
+ entry = booking.get(consumer);
+ }
+
+ int delta = requestMB - entry.reservedMB;
+
+ if (delta > 0) {
+ checkFreeMemoryAndUpdateBooking(entry, delta);
+ } else {
+ updateBooking(entry, delta);
+ }
+ }
+
+ synchronized private void updateBooking(ConsumerEntry entry, int delta) {
+ totalReservedMB += delta;
+ entry.reservedMB -= delta;
+ if (entry.reservedMB == 0) {
+ booking.remove(entry.consumer);
+ }
+ if (delta < 0) {
+ this.notify();
+ }
+ }
+
+ synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+ while (true) {
+ // if budget is not enough, try free up
+ if (delta > totalBudgetMB - totalReservedMB) {
+ int freeUpToGo = delta;
+ for (ConsumerEntry entry : booking.values()) {
+ if (entry.consumer != consumer.consumer) {
+ int mb = entry.consumer.freeUp(freeUpToGo);
+ updateBooking(entry, -mb);
+ freeUpToGo -= mb;
+ if (freeUpToGo <= 0)
+ break;
+ }
+ }
+ if (freeUpToGo > 0)
+ throw new NotEnoughBudgetException();
+
+ Runtime.getRuntime().gc();
+ }
+
+ if (checkSystemAvailMB(delta))
+ break;
+
+ try {
+ logger.debug("Memory budget has " + (totalBudgetMB - totalReservedMB) + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ this.wait(200);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while wait free memory", e);
+ }
+ }
+
+ updateBooking(consumer, delta);
+ }
+
+ private boolean checkSystemAvailMB(int mb) {
+ return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+ }
+
+ public static int getSystemAvailMB() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ int availMB = (int) (availableMemory / ONE_MB);
+ return availMB;
+ }
+
+ public static int getMaxPossibleBudget() {
+ return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..bc2eadd
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+ @Test
+ public void test() {
+ int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+ MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+ ArrayList<OneMB> mbList = new ArrayList<OneMB>();
+ for (int i = 0; i < n; i++) {
+ mbList.add(new OneMB(mbc));
+ assertEquals(mbList.size(), mbc.getTotalReservedMB());
+ }
+
+ mbc.reserve(new OneMB(), n);
+
+ for (int i = 0; i < n; i++) {
+ assertEquals(null, mbList.get(i).data);
+ }
+
+ try {
+ mbc.reserve(new OneMB(), 1);
+ fail();
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ class OneMB implements MemoryBudgetController.MemoryConsumer {
+
+ byte[] data;
+
+ OneMB() {
+ }
+
+ OneMB(MemoryBudgetController mbc) {
+ mbc.reserve(this, 1);
+ data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ if (data != null) {
+ data = null;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
new file mode 100644
index 0000000..b5a58ca
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
@@ -0,0 +1,357 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+
+ final GTInfo info;
+ final MemPart memPart;
+ final DiskPart diskPart;
+
+ public MemDiskStore(GTInfo info, File diskFile, MemoryBudgetController budgetCtrl) throws IOException {
+ this.info = info;
+ this.memPart = new MemPart(budgetCtrl);
+ this.diskPart = new DiskPart(diskFile);
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return new Writer(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return new Writer(Math.max(memPart.tailOffset(), diskPart.tailOffset));
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private class Writer implements IGTStoreWriter {
+
+ final DataOutputStream dout;
+ boolean memPartActivated = true;
+
+ Writer(final long disktOffset) throws IOException {
+ memPart.clear();
+ diskPart.clear();
+
+ memPart.activateWrite();
+
+ OutputStream out = new OutputStream() {
+ long diskOffset = disktOffset;
+ byte[] tmp = new byte[1];
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ while (length > 0) {
+ if (memPartActivated) {
+ int n = memPart.write(bytes, offset, length, diskOffset);
+ offset += n;
+ length -= n;
+ diskOffset += n;
+ if (n == 0) {
+ memPartActivated = false;
+ }
+ } else {
+ diskPart.write(diskOffset, bytes, offset, length);
+ diskOffset += length;
+ }
+ }
+ }
+ };
+ dout = new DataOutputStream(out);
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ memPart.finishAsyncFlush();
+ }
+ }
+
+ private static class MemChunk {
+ long diskOffset;
+ int length;
+ byte[] data;
+ MemChunk next;
+
+ boolean isFull() {
+ return length == data.length;
+ }
+
+ long headOffset() {
+ return diskOffset;
+ }
+
+ long tailOffset() {
+ return diskOffset + length;
+ }
+
+ int freeSpace() {
+ return data.length - length;
+ }
+ }
+
+ private class MemPart implements Closeable, MemoryConsumer {
+
+ final MemoryBudgetController budgetCtrl;
+
+ // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+ volatile boolean writeActivated;
+ MemChunk firstChunk;
+ MemChunk lastChunk;
+ int chunkCount;
+
+ Thread asyncFlusher;
+ MemChunk asyncFlushChunk;
+ long asyncFlushDiskOffset;
+ Throwable asyncFlushException;
+
+ MemPart(MemoryBudgetController budgetCtrl) {
+ this.budgetCtrl = budgetCtrl;
+ }
+
+ @SuppressWarnings("unused")
+ long headOffset() {
+ return firstChunk == null ? 0 : firstChunk.headOffset();
+ }
+
+ long tailOffset() {
+ return lastChunk == null ? 0 : lastChunk.tailOffset();
+ }
+
+ synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ if (writeActivated == false)
+ return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull()) {
+ allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+ if (writeActivated == false)
+ return 0;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
+
+ asyncFlush(lastChunk, diskOffset, n);
+
+ return n;
+ }
+
+ private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+ if (asyncFlusher == null) {
+ asyncFlusher = new Thread() {
+ public void run() {
+ try {
+ while (writeActivated) {
+ synchronized (asyncFlusher) {
+ try {
+ asyncFlusher.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ flushToDiskPart();
+ }
+ flushToDiskPart();
+ } catch (Throwable ex) {
+ asyncFlushException = ex;
+ }
+ }
+ };
+ asyncFlusher.start();
+ }
+
+ if (asyncFlushChunk == null) {
+ asyncFlushChunk = lastChunk;
+ asyncFlushDiskOffset = diskOffset;
+ }
+
+ // flush in batch
+ if (diskOffset + n >= asyncFlushDiskOffset + 4096) {
+ synchronized (asyncFlusher) {
+ asyncFlusher.notify();
+ }
+ }
+ }
+
+ private void flushToDiskPart() throws IOException {
+ byte[] data;
+ int offset = 0;
+ int length = 0;
+
+ while (true) {
+ data = null;
+ synchronized (memPart) {
+ asyncFlushDiskOffset += length; // bytes written in last loop
+ if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+ asyncFlushChunk = asyncFlushChunk.next;
+ }
+ if (asyncFlushChunk != null) {
+ data = asyncFlushChunk.data;
+ offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+ length = asyncFlushChunk.length - offset;
+ }
+ }
+
+ if (data == null)
+ break;
+
+ diskPart.write(asyncFlushDiskOffset, data, offset, length);
+ }
+ }
+
+ private void allocateNewMemChunk(long diskOffset) {
+ try {
+ budgetCtrl.reserve(this, chunkCount + 1);
+
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+
+ } catch (NotEnoughBudgetException ex) {
+ deactivateWrite();
+ }
+ }
+
+ synchronized public void finishAsyncFlush() throws IOException {
+ deactivateWrite();
+ if (asyncFlusher != null) {
+ synchronized (asyncFlusher) {
+ asyncFlusher.notify();
+ }
+
+ try {
+ asyncFlusher.join();
+ } catch (InterruptedException e) {
+ logger.warn("", e);
+ }
+ asyncFlusher = null;
+
+ if (asyncFlushException != null) {
+ if (asyncFlushException instanceof IOException)
+ throw (IOException) asyncFlushException;
+ else
+ throw new IOException(asyncFlushException);
+ }
+ }
+ }
+
+ @Override
+ synchronized public int freeUp(int mb) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased++;
+ chunkCount--;
+ if (chunkCount == 0)
+ firstChunk = lastChunk = null;
+ else
+ firstChunk = firstChunk.next;
+ }
+ return mbReleased;
+ }
+
+ void activateWrite() {
+ writeActivated = true;
+ }
+
+ void deactivateWrite() {
+ writeActivated = false;
+ }
+
+ synchronized public void clear() {
+ chunkCount = 0;
+ firstChunk = lastChunk = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ finishAsyncFlush();
+ clear();
+ }
+
+ }
+
+ private class DiskPart implements Closeable {
+ final File diskFile;
+ final FileChannel channel;
+ long tailOffset;
+
+ DiskPart(File diskFile) throws IOException {
+ this.diskFile = diskFile;
+ this.channel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE, StandardOpenOption.READ);
+ }
+
+ void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset += length;
+ }
+
+ void clear() throws IOException {
+ tailOffset = 0;
+ channel.truncate(0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+ }
+
+}