You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/01 15:19:45 UTC
[1/2] incubator-kylin git commit: KYLIN-806,
refactor in-mem cube classes to under job.inmemcubing package
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 731f33b0c -> 765dfca8c
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
deleted file mode 100644
index 69b3bb9..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
-
-public class MemoryBudgetControllerTest {
-
- @Test
- public void test() {
- final int n = MemoryBudgetController.getSystemAvailMB() / 2;
- final MemoryBudgetController mbc = new MemoryBudgetController(n);
-
- ArrayList<Consumer> mbList = new ArrayList<Consumer>();
- for (int i = 0; i < n; i++) {
- mbList.add(new Consumer(mbc));
- assertEquals(mbList.size(), mbc.getTotalReservedMB());
- }
-
- // a's reservation will free up all the previous
- final Consumer a = new Consumer();
- mbc.reserve(a, n);
- for (int i = 0; i < n; i++) {
- assertEquals(null, mbList.get(i).data);
- }
-
- // cancel a in 2 seconds
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- mbc.reserve(a, 0);
- }
- }.start();
-
- // b will success after some wait
- long bWaitStart = System.currentTimeMillis();
- final Consumer b = new Consumer();
- mbc.reserveInsist(b, n);
- assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
-
- try {
- mbc.reserve(a, 1);
- fail();
- } catch (NotEnoughBudgetException ex) {
- // expected
- }
- }
-
- class Consumer implements MemoryBudgetController.MemoryConsumer {
-
- byte[] data;
-
- Consumer() {
- }
-
- Consumer(MemoryBudgetController mbc) {
- mbc.reserve(this, 1);
- data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
- }
-
- @Override
- public int freeUp(int mb) {
- if (data != null) {
- data = null;
- return 1;
- } else {
- return 0;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 88db599..bcf0f29 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.storage.gridtable;
import static org.junit.Assert.*;
@@ -130,7 +147,7 @@ public class SimpleGridTableTest {
return builder;
}
- static List<GTRecord> mockupData(GTInfo info, int nRows) {
+ public static List<GTRecord> mockupData(GTInfo info, int nRows) {
List<GTRecord> result = new ArrayList<GTRecord>(nRows);
int round = nRows / 10;
for (int i = 0; i < round; i++) {
@@ -199,13 +216,13 @@ public class SimpleGridTableTest {
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
}
- static GTInfo basicInfo() {
+ public static GTInfo basicInfo() {
Builder builder = infoBuilder();
GTInfo info = builder.build();
return info;
}
- static GTInfo advancedInfo() {
+ public static GTInfo advancedInfo() {
Builder builder = infoBuilder();
builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
builder.enableRowBlock(4);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index ee3826a..6cfb32f 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.storage.gridtable;
import static org.junit.Assert.*;
[2/2] incubator-kylin git commit: KYLIN-806,
refactor in-mem cube classes to under job.inmemcubing package
Posted by li...@apache.org.
KYLIN-806, refactor in-mem cube classes to under job.inmemcubing package
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/765dfca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/765dfca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/765dfca8
Branch: refs/heads/0.8.0
Commit: 765dfca8ca4374bfd811c2aae485a182e420c042
Parents: 731f33b
Author: Yang Li <li...@apache.org>
Authored: Mon Jun 1 21:18:24 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Jun 1 21:19:18 2015 +0800
----------------------------------------------------------------------
.../job/inmemcubing/ConcurrentDiskStore.java | 22 +
.../kylin/job/inmemcubing/ICuboidWriter.java | 17 +
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 6 +-
.../kylin/job/inmemcubing/MemDiskStore.java | 678 +++++++++++++++++++
.../job/inmemcubing/MemoryBudgetController.java | 250 +++++++
.../job/inmemcubing/GTMemDiskStoreTest.java | 98 +++
.../inmemcubing/MemoryBudgetControllerTest.java | 98 +++
.../gridtable/memstore/GTMemDiskStore.java | 661 ------------------
.../memstore/MemoryBudgetController.java | 233 -------
.../gridtable/AggregationCacheMemSizeTest.java | 31 +-
.../storage/gridtable/DictGridTableTest.java | 17 +
.../storage/gridtable/GTMemDiskStoreTest.java | 74 --
.../gridtable/MemoryBudgetControllerTest.java | 81 ---
.../storage/gridtable/SimpleGridTableTest.java | 23 +-
.../gridtable/SimpleInvertedIndexTest.java | 17 +
15 files changed, 1247 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
new file mode 100644
index 0000000..cca1040
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+public class ConcurrentDiskStore {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
index c05f217..91d4a2a 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.job.inmemcubing;
import org.apache.kylin.storage.gridtable.GTRecord;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 567716a..1207466 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -55,8 +55,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.GridTable;
import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +158,7 @@ public class InMemCubeBuilder implements Runnable {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
// could use a real budget controller, but experiment shows write directly to disk is the same fast
// GTMemDiskStore store = new GTMemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
- GTMemDiskStore store = new GTMemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+ MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
GridTable gridTable = new GridTable(info, store);
return gridTable;
}
@@ -568,7 +566,7 @@ public class InMemCubeBuilder implements Runnable {
}
private void closeStore(GridTable gt) throws IOException {
- ((GTMemDiskStore) gt.getStore()).close();
+ ((MemDiskStore) gt.getStore()).close();
}
// ===========================================================================
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
new file mode 100644
index 0000000..44376e5
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.apache.kylin.job.inmemcubing.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+ private static final boolean debug = true;
+
+ private static final int STREAM_BUFFER_SIZE = 8192;
+ private static final int MEM_CHUNK_SIZE_MB = 5;
+
+ private final GTInfo info;
+ private final Object lock; // all public methods that read/write object states are synchronized on this lock
+ private final MemPart memPart;
+ private final DiskPart diskPart;
+ private final boolean delOnClose;
+
+ private Writer ongoingWriter;
+
+ public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+ this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+ }
+
+ public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+ this(info, budgetCtrl, diskFile, false);
+ }
+
+ private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+ this.info = info;
+ this.lock = this;
+ this.memPart = new MemPart(budgetCtrl);
+ this.diskPart = new DiskPart(diskFile);
+ this.delOnClose = delOnClose;
+
+ // in case user forget to call close()
+ if (delOnClose)
+ diskFile.deleteOnExit();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return newWriter(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return newWriter(length());
+ }
+
+ private Writer newWriter(long startOffset) throws IOException {
+ synchronized (lock) {
+ if (ongoingWriter != null)
+ throw new IllegalStateException();
+
+ ongoingWriter = new Writer(startOffset);
+ return ongoingWriter;
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ synchronized (lock) {
+ return new Reader();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // synchronized inside the parts close()
+ memPart.close();
+ diskPart.close();
+ }
+
+ public long length() {
+ synchronized (lock) {
+ return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+ }
+
+ private class Reader implements IGTStoreScanner {
+
+ final DataInputStream din;
+ long readOffset = 0;
+ long memRead = 0;
+ long diskRead = 0;
+ int nReadCalls = 0;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader() throws IOException {
+ diskPart.openRead();
+ if (debug)
+ logger.debug(MemDiskStore.this + " read start @ " + readOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+ MemChunk memChunk;
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ synchronized (lock) {
+ nReadCalls++;
+ if (available() <= 0)
+ return -1;
+
+ if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+ memChunk = memPart.seekMemChunk(readOffset);
+ }
+
+ int lenToGo = Math.min(available(), len);
+
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n;
+ if (memChunk != null) {
+ if (memChunk.headOffset() > readOffset) {
+ memChunk = null;
+ continue;
+ }
+ if (readOffset >= memChunk.tailOffset()) {
+ memChunk = memChunk.next;
+ continue;
+ }
+ int chunkOffset = (int) (readOffset - memChunk.headOffset());
+ n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+ System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+ memRead += n;
+ } else {
+ n = diskPart.read(readOffset, b, off, lenToGo);
+ diskRead += n;
+ }
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ readOffset += n;
+ }
+ return nRead;
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ synchronized (lock) {
+ return (int) (length() - readOffset);
+ }
+ }
+ };
+
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ din.close();
+ diskPart.closeRead();
+ if (debug)
+ logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ }
+ }
+
+ }
+
+ private class Writer implements IGTStoreWriter {
+
+ final DataOutputStream dout;
+ long writeOffset;
+ long memWrite = 0;
+ long diskWrite = 0;
+ int nWriteCalls;
+ boolean closed = false;
+
+ Writer(long startOffset) throws IOException {
+ writeOffset = 0; // TODO does not support append yet
+ memPart.clear();
+ diskPart.clear();
+ diskPart.openWrite(false);
+ if (debug)
+ logger.debug(MemDiskStore.this + " write start @ " + writeOffset);
+
+ memPart.activateMemWrite();
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+ boolean memPartActivated = true;
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ // lock inside memPart.write() and diskPartm.write()
+ nWriteCalls++;
+ while (length > 0) {
+ int n;
+ if (memPartActivated) {
+ n = memPart.write(bytes, offset, length, writeOffset);
+ memWrite += n;
+ if (n == 0) {
+ memPartActivated = false;
+ }
+ } else {
+ n = diskPart.write(writeOffset, bytes, offset, length);
+ diskWrite += n;
+ }
+ offset += n;
+ length -= n;
+ writeOffset += n;
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (!closed) {
+ dout.close();
+ memPart.deactivateMemWrite();
+ }
+
+ if (memPart.asyncFlusher == null) {
+ assert writeOffset == diskPart.tailOffset;
+ diskPart.closeWrite();
+ ongoingWriter = null;
+ if (debug)
+ logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ } else {
+ // the asyncFlusher will call this close() again later
+ }
+ closed = true;
+ }
+ }
+ }
+
+ private static class MemChunk {
+ long diskOffset;
+ int length;
+ byte[] data;
+ MemChunk next;
+
+ boolean isFull() {
+ return length == data.length;
+ }
+
+ long headOffset() {
+ return diskOffset;
+ }
+
+ long tailOffset() {
+ return diskOffset + length;
+ }
+
+ int freeSpace() {
+ return data.length - length;
+ }
+ }
+
+ private class MemPart implements Closeable, MemoryConsumer {
+
+ final MemoryBudgetController budgetCtrl;
+
+ // async flush thread checks this flag out of sync block
+ volatile boolean writeActivated;
+ MemChunk firstChunk;
+ MemChunk lastChunk;
+ int chunkCount;
+
+ Thread asyncFlusher;
+ MemChunk asyncFlushChunk;
+ long asyncFlushDiskOffset;
+ Throwable asyncFlushException;
+
+ MemPart(MemoryBudgetController budgetCtrl) {
+ this.budgetCtrl = budgetCtrl;
+ }
+
+ long headOffset() {
+ return firstChunk == null ? 0 : firstChunk.headOffset();
+ }
+
+ long tailOffset() {
+ return lastChunk == null ? 0 : lastChunk.tailOffset();
+ }
+
+ public MemChunk seekMemChunk(long diskOffset) {
+ MemChunk c = firstChunk;
+ while (c != null && c.headOffset() <= diskOffset) {
+ if (diskOffset < c.tailOffset())
+ break;
+ c = c.next;
+ }
+ return c;
+ }
+
+ public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ int needMoreMem = 0;
+
+ synchronized (lock) {
+ if (writeActivated == false)
+ return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull())
+ needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+ }
+
+ // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
+ if (needMoreMem > 0) {
+ try {
+ budgetCtrl.reserve(this, needMoreMem);
+ } catch (NotEnoughBudgetException ex) {
+ deactivateMemWrite();
+ return 0;
+ }
+ }
+
+ synchronized (lock) {
+ if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
+
+ if (n > 0)
+ asyncFlush(lastChunk, diskOffset, n);
+
+ return n;
+ }
+ }
+
+ private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+ if (asyncFlushChunk == null) {
+ asyncFlushChunk = lastChunk;
+ asyncFlushDiskOffset = diskOffset;
+ }
+
+ if (asyncFlusher == null) {
+ asyncFlusher = new Thread() {
+ public void run() {
+ asyncFlushException = null;
+ if (debug)
+ logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+ try {
+ while (writeActivated) {
+ flushToDisk();
+ Thread.sleep(10);
+ }
+ flushToDisk();
+
+ if (debug)
+ logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+ synchronized (lock) {
+ asyncFlusher = null;
+ asyncFlushChunk = null;
+ if (ongoingWriter.closed) {
+ ongoingWriter.close(); // call writer.close() again to clean up
+ }
+ }
+ } catch (Throwable ex) {
+ asyncFlushException = ex;
+ }
+ }
+ };
+ asyncFlusher.start();
+ }
+ }
+
+ private void flushToDisk() throws IOException {
+ byte[] data;
+ int offset = 0;
+ int length = 0;
+ int flushedLen = 0;
+
+ while (true) {
+ data = null;
+ synchronized (lock) {
+ asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+ // if (debug)
+ // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+ if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+ asyncFlushChunk = asyncFlushChunk.next;
+ }
+ if (asyncFlushChunk != null) {
+ data = asyncFlushChunk.data;
+ offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+ length = asyncFlushChunk.length - offset;
+ }
+ }
+
+ if (data == null)
+ break;
+
+ flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+ }
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ synchronized (lock) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased += MEM_CHUNK_SIZE_MB;
+ chunkCount--;
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = null;
+ } else {
+ MemChunk next = firstChunk.next;
+ firstChunk.next = null;
+ firstChunk = next;
+ }
+ }
+ return mbReleased;
+ }
+ }
+
+ public void activateMemWrite() {
+ if (budgetCtrl.getTotalBudgetMB() > 0) {
+ writeActivated = true;
+ if (debug)
+ logger.debug(MemDiskStore.this + " mem write activated");
+ }
+ }
+
+ public void deactivateMemWrite() {
+ writeActivated = false;
+ if (debug)
+ logger.debug(MemDiskStore.this + " mem write de-activated");
+ }
+
+ public void clear() {
+ chunkCount = 0;
+ firstChunk = lastChunk = null;
+ budgetCtrl.reserve(this, 0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+ }
+ try {
+ asyncFlusher.join();
+ } catch (NullPointerException npe) {
+ // that's fine, async flusher may not present
+ } catch (InterruptedException e) {
+ logger.warn("async join interrupted", e);
+ }
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+
+ clear();
+ }
+ }
+
+ private void throwAsyncException(Throwable ex) throws IOException {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else
+ throw new IOException(ex);
+ }
+
+ @Override
+ public String toString() {
+ return MemDiskStore.this.toString();
+ }
+
+ }
+
+ private class DiskPart implements Closeable {
+ final File diskFile;
+ FileChannel writeChannel;
+ FileChannel readChannel;
+ int readerCount = 0; // allow parallel readers
+ long tailOffset;
+
+ DiskPart(File diskFile) throws IOException {
+ this.diskFile = diskFile;
+ this.tailOffset = diskFile.length();
+ if (debug)
+ logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ public void openRead() throws IOException {
+ if (readChannel == null) {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ }
+ readerCount++;
+ }
+
+ public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ }
+
+ public void closeRead() throws IOException {
+ closeRead(false);
+ }
+
+ private void closeRead(boolean force) throws IOException {
+ readerCount--;
+ if (readerCount == 0 || force) {
+ if (readChannel != null) {
+ readChannel.close();
+ readChannel = null;
+ }
+ }
+ }
+
+ public void openWrite(boolean append) throws IOException {
+ if (append) {
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ tailOffset = diskFile.length();
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ tailOffset = 0;
+ }
+ }
+
+ public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ synchronized (lock) {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + n, tailOffset);
+ return n;
+ }
+ }
+
+ public void closeWrite() throws IOException {
+ if (writeChannel != null) {
+ writeChannel.close();
+ writeChannel = null;
+ }
+ }
+
+ public void clear() throws IOException {
+ diskFile.delete();
+ tailOffset = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ closeWrite();
+ closeRead(true);
+ if (delOnClose) {
+ diskFile.delete();
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
new file mode 100644
index 0000000..4b50160
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+ private static final boolean debug = true;
+
+ public static interface MemoryConsumer {
+ // return number MB released
+ int freeUp(int mb);
+ }
+
+ @SuppressWarnings("serial")
+ public static class NotEnoughBudgetException extends IllegalStateException {
+
+ public NotEnoughBudgetException() {
+ super();
+ }
+
+ public NotEnoughBudgetException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static class ConsumerEntry {
+ final MemoryConsumer consumer;
+ int reservedMB;
+
+ ConsumerEntry(MemoryConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+ public static final int ONE_MB = 1024 * 1024;
+
+ private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+ // all budget numbers are in MB
+ private final int totalBudgetMB;
+ private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+ private int totalReservedMB;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public MemoryBudgetController(int totalBudgetMB) {
+ if (totalBudgetMB < 0)
+ throw new IllegalArgumentException();
+ if (totalBudgetMB > getSystemAvailMB())
+ throw new IllegalStateException();
+
+ this.totalBudgetMB = totalBudgetMB;
+ this.totalReservedMB = 0;
+ }
+
+ public int getTotalBudgetMB() {
+ return totalBudgetMB;
+ }
+
+ public int getTotalReservedMB() {
+ lock.lock();
+ try {
+ return totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getRemainingBudgetMB() {
+ lock.lock();
+ try {
+ return totalBudgetMB - totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+ long waitStart = 0;
+ while (true) {
+ try {
+ reserve(consumer, requestMB);
+ if (debug && waitStart > 0)
+ logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+ return;
+ } catch (NotEnoughBudgetException ex) {
+ // retry
+ }
+
+ if (waitStart == 0)
+ waitStart = System.currentTimeMillis();
+
+ synchronized (lock) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ throw new NotEnoughBudgetException(e);
+ }
+ }
+ }
+ }
+
+ /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+ public void reserve(MemoryConsumer consumer, int requestMB) {
+ if (totalBudgetMB == 0 && requestMB > 0)
+ throw new NotEnoughBudgetException();
+
+ boolean ok = false;
+ while (!ok) {
+ int gap = calculateGap(consumer, requestMB);
+ if (gap > 0) {
+ // to void deadlock, don't hold lock when invoking consumer.freeUp()
+ tryFreeUp(gap);
+ }
+ ok = updateBooking(consumer, requestMB);
+ }
+ }
+
+ private int calculateGap(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ int curMB = entry == null ? 0 : entry.reservedMB;
+ int delta = requestMB - curMB;
+ return delta - (totalBudgetMB - totalReservedMB);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void tryFreeUp(int gap) {
+ // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+ for (ConsumerEntry entry : booking.values()) {
+ int mb = entry.consumer.freeUp(gap);
+ if (mb > 0) {
+ lock.lock();
+ try {
+ updateBookingWithDelta(entry.consumer, -mb);
+ } finally {
+ lock.unlock();
+ }
+ gap -= mb;
+ if (gap <= 0)
+ break;
+ }
+ }
+ if (gap > 0)
+ throw new NotEnoughBudgetException();
+
+ if (debug) {
+ if (getSystemAvailMB() < getRemainingBudgetMB()) {
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ }
+ }
+ }
+
+ private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (requestMB == 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ int delta = requestMB - entry.reservedMB;
+ return updateBookingWithDelta(consumer, delta);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // lock MUST be obtained before entering
+ private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+ if (delta == 0)
+ return true;
+
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (delta <= 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ // double check gap again, it may be changed by other concurrent requests
+ if (delta > 0) {
+ int gap = delta - (totalBudgetMB - totalReservedMB);
+ if (gap > 0)
+ return false;
+ }
+
+ totalReservedMB += delta;
+ entry.reservedMB += delta;
+ if (entry.reservedMB == 0) {
+ booking.remove(entry.consumer);
+ }
+ if (debug) {
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ }
+
+ if (delta < 0) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+ return true;
+ }
+
+ public static long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+ public static int getSystemAvailMB() {
+ return (int) (getSystemAvailBytes() / ONE_MB);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..37eb42d
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.job.inmemcubing.MemDiskStore;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.SimpleGridTableTest;
+import org.junit.Test;
+
+public class GTMemDiskStoreTest {
+
+ final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+ final GTInfo info = SimpleGridTableTest.advancedInfo();
+ final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadWriteRead() throws IOException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead();
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+
+ int nThreads = 5;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ verifyOneTableWriteAndRead();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead() throws IOException {
+ MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table);
+ }
+
+ private void verifyWriteAndRead(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ IGTScanner scanner = table.scan(new GTScanRequest(info));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..e6a1fb4
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+ @Test
+ public void test() {
+ final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+ final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+ ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+ for (int i = 0; i < n; i++) {
+ mbList.add(new Consumer(mbc));
+ assertEquals(mbList.size(), mbc.getTotalReservedMB());
+ }
+
+ // a's reservation will free up all the previous
+ final Consumer a = new Consumer();
+ mbc.reserve(a, n);
+ for (int i = 0; i < n; i++) {
+ assertEquals(null, mbList.get(i).data);
+ }
+
+ // cancel a in 2 seconds
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ mbc.reserve(a, 0);
+ }
+ }.start();
+
+ // b will success after some wait
+ long bWaitStart = System.currentTimeMillis();
+ final Consumer b = new Consumer();
+ mbc.reserveInsist(b, n);
+ assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+ try {
+ mbc.reserve(a, 1);
+ fail();
+ } catch (NotEnoughBudgetException ex) {
+ // expected
+ }
+ }
+
+ class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+ byte[] data;
+
+ Consumer() {
+ }
+
+ Consumer(MemoryBudgetController mbc) {
+ mbc.reserve(this, 1);
+ data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ if (data != null) {
+ data = null;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
deleted file mode 100644
index 65d4905..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ /dev/null
@@ -1,661 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import static org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GTMemDiskStore implements IGTStore, Closeable {
-
- private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
- private static final boolean debug = true;
-
- private static final int STREAM_BUFFER_SIZE = 8192;
- private static final int MEM_CHUNK_SIZE_MB = 5;
-
- private final GTInfo info;
- private final Object lock; // all public methods that read/write object states are synchronized on this lock
- private final MemPart memPart;
- private final DiskPart diskPart;
- private final boolean delOnClose;
-
- private Writer ongoingWriter;
-
- public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
- this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
- }
-
- public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
- this(info, budgetCtrl, diskFile, false);
- }
-
- private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
- this.info = info;
- this.lock = this;
- this.memPart = new MemPart(budgetCtrl);
- this.diskPart = new DiskPart(diskFile);
- this.delOnClose = delOnClose;
-
- // in case user forget to call close()
- if (delOnClose)
- diskFile.deleteOnExit();
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public IGTStoreWriter rebuild(int shard) throws IOException {
- return newWriter(0);
- }
-
- @Override
- public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
- return newWriter(length());
- }
-
- private Writer newWriter(long startOffset) throws IOException {
- synchronized (lock) {
- if (ongoingWriter != null)
- throw new IllegalStateException();
-
- ongoingWriter = new Writer(startOffset);
- return ongoingWriter;
- }
- }
-
- @Override
- public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
- synchronized (lock) {
- return new Reader();
- }
- }
-
- @Override
- public void close() throws IOException {
- // synchronized inside the parts close()
- memPart.close();
- diskPart.close();
- }
-
- public long length() {
- synchronized (lock) {
- return Math.max(memPart.tailOffset(), diskPart.tailOffset);
- }
- }
-
- @Override
- public String toString() {
- return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
- }
-
- private class Reader implements IGTStoreScanner {
-
- final DataInputStream din;
- long readOffset = 0;
- long memRead = 0;
- long diskRead = 0;
- int nReadCalls = 0;
-
- GTRowBlock block = GTRowBlock.allocate(info);
- GTRowBlock next = null;
-
- Reader() throws IOException {
- diskPart.openRead();
- if (debug)
- logger.debug(GTMemDiskStore.this + " read start @ " + readOffset);
-
- InputStream in = new InputStream() {
- byte[] tmp = new byte[1];
- MemChunk memChunk;
-
- @Override
- public int read() throws IOException {
- int n = read(tmp, 0, 1);
- if (n <= 0)
- return -1;
- else
- return (int) tmp[0];
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- synchronized (lock) {
- nReadCalls++;
- if (available() <= 0)
- return -1;
-
- if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
- memChunk = memPart.seekMemChunk(readOffset);
- }
-
- int lenToGo = Math.min(available(), len);
-
- int nRead = 0;
- while (lenToGo > 0) {
- int n;
- if (memChunk != null) {
- if (memChunk.headOffset() > readOffset) {
- memChunk = null;
- continue;
- }
- if (readOffset >= memChunk.tailOffset()) {
- memChunk = memChunk.next;
- continue;
- }
- int chunkOffset = (int) (readOffset - memChunk.headOffset());
- n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
- System.arraycopy(memChunk.data, chunkOffset, b, off, n);
- memRead += n;
- } else {
- n = diskPart.read(readOffset, b, off, lenToGo);
- diskRead += n;
- }
- lenToGo -= n;
- nRead += n;
- off += n;
- readOffset += n;
- }
- return nRead;
- }
- }
-
- @Override
- public int available() throws IOException {
- synchronized (lock) {
- return (int) (length() - readOffset);
- }
- }
- };
-
- din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
- }
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- try {
- if (din.available() > 0) {
- block.importFrom(din);
- next = block;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return next != null;
- }
-
- @Override
- public GTRowBlock next() {
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
- GTRowBlock r = next;
- next = null;
- return r;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- din.close();
- diskPart.closeRead();
- if (debug)
- logger.debug(GTMemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
- }
- }
-
- }
-
- private class Writer implements IGTStoreWriter {
-
- final DataOutputStream dout;
- long writeOffset;
- long memWrite = 0;
- long diskWrite = 0;
- int nWriteCalls;
- boolean closed = false;
-
- Writer(long startOffset) throws IOException {
- writeOffset = 0; // TODO does not support append yet
- memPart.clear();
- diskPart.clear();
- diskPart.openWrite(false);
- if (debug)
- logger.debug(GTMemDiskStore.this + " write start @ " + writeOffset);
-
- memPart.activateMemWrite();
-
- OutputStream out = new OutputStream() {
- byte[] tmp = new byte[1];
- boolean memPartActivated = true;
-
- @Override
- public void write(int b) throws IOException {
- tmp[0] = (byte) b;
- write(tmp, 0, 1);
- }
-
- @Override
- public void write(byte[] bytes, int offset, int length) throws IOException {
- // lock inside memPart.write() and diskPartm.write()
- nWriteCalls++;
- while (length > 0) {
- int n;
- if (memPartActivated) {
- n = memPart.write(bytes, offset, length, writeOffset);
- memWrite += n;
- if (n == 0) {
- memPartActivated = false;
- }
- } else {
- n = diskPart.write(writeOffset, bytes, offset, length);
- diskWrite += n;
- }
- offset += n;
- length -= n;
- writeOffset += n;
- }
- }
- };
- dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
- }
-
- @Override
- public void write(GTRowBlock block) throws IOException {
- block.export(dout);
- }
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (!closed) {
- dout.close();
- memPart.deactivateMemWrite();
- }
-
- if (memPart.asyncFlusher == null) {
- assert writeOffset == diskPart.tailOffset;
- diskPart.closeWrite();
- ongoingWriter = null;
- if (debug)
- logger.debug(GTMemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
- } else {
- // the asyncFlusher will call this close() again later
- }
- closed = true;
- }
- }
- }
-
- private static class MemChunk {
- long diskOffset;
- int length;
- byte[] data;
- MemChunk next;
-
- boolean isFull() {
- return length == data.length;
- }
-
- long headOffset() {
- return diskOffset;
- }
-
- long tailOffset() {
- return diskOffset + length;
- }
-
- int freeSpace() {
- return data.length - length;
- }
- }
-
- private class MemPart implements Closeable, MemoryConsumer {
-
- final MemoryBudgetController budgetCtrl;
-
- // async flush thread checks this flag out of sync block
- volatile boolean writeActivated;
- MemChunk firstChunk;
- MemChunk lastChunk;
- int chunkCount;
-
- Thread asyncFlusher;
- MemChunk asyncFlushChunk;
- long asyncFlushDiskOffset;
- Throwable asyncFlushException;
-
- MemPart(MemoryBudgetController budgetCtrl) {
- this.budgetCtrl = budgetCtrl;
- }
-
- long headOffset() {
- return firstChunk == null ? 0 : firstChunk.headOffset();
- }
-
- long tailOffset() {
- return lastChunk == null ? 0 : lastChunk.tailOffset();
- }
-
- public MemChunk seekMemChunk(long diskOffset) {
- MemChunk c = firstChunk;
- while (c != null && c.headOffset() <= diskOffset) {
- if (diskOffset < c.tailOffset())
- break;
- c = c.next;
- }
- return c;
- }
-
- public int write(byte[] bytes, int offset, int length, long diskOffset) {
- int needMoreMem = 0;
-
- synchronized (lock) {
- if (writeActivated == false)
- return 0;
-
- // write is only expected at the tail
- if (diskOffset != tailOffset())
- return 0;
-
- if (chunkCount == 0 || lastChunk.isFull())
- needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
- }
-
- // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
- if (needMoreMem > 0) {
- try {
- budgetCtrl.reserve(this, needMoreMem);
- } catch (NotEnoughBudgetException ex) {
- deactivateMemWrite();
- return 0;
- }
- }
-
- synchronized (lock) {
- if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
- MemChunk chunk = new MemChunk();
- chunk.diskOffset = diskOffset;
- chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
- if (chunkCount == 0) {
- firstChunk = lastChunk = chunk;
- } else {
- lastChunk.next = chunk;
- lastChunk = chunk;
- }
- chunkCount++;
- }
-
- int n = Math.min(lastChunk.freeSpace(), length);
- System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
- lastChunk.length += n;
-
- if (n > 0)
- asyncFlush(lastChunk, diskOffset, n);
-
- return n;
- }
- }
-
- private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
- if (asyncFlushChunk == null) {
- asyncFlushChunk = lastChunk;
- asyncFlushDiskOffset = diskOffset;
- }
-
- if (asyncFlusher == null) {
- asyncFlusher = new Thread() {
- public void run() {
- asyncFlushException = null;
- if (debug)
- logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
- try {
- while (writeActivated) {
- flushToDisk();
- Thread.sleep(10);
- }
- flushToDisk();
-
- if (debug)
- logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
-
- synchronized (lock) {
- asyncFlusher = null;
- asyncFlushChunk = null;
- if (ongoingWriter.closed) {
- ongoingWriter.close(); // call writer.close() again to clean up
- }
- }
- } catch (Throwable ex) {
- asyncFlushException = ex;
- }
- }
- };
- asyncFlusher.start();
- }
- }
-
- private void flushToDisk() throws IOException {
- byte[] data;
- int offset = 0;
- int length = 0;
- int flushedLen = 0;
-
- while (true) {
- data = null;
- synchronized (lock) {
- asyncFlushDiskOffset += flushedLen; // bytes written in last loop
- // if (debug)
- // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
- if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
- asyncFlushChunk = asyncFlushChunk.next;
- }
- if (asyncFlushChunk != null) {
- data = asyncFlushChunk.data;
- offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
- length = asyncFlushChunk.length - offset;
- }
- }
-
- if (data == null)
- break;
-
- flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
- }
- }
-
- @Override
- public int freeUp(int mb) {
- synchronized (lock) {
- int mbReleased = 0;
- while (chunkCount > 0 && mbReleased < mb) {
- if (firstChunk == asyncFlushChunk)
- break;
-
- mbReleased += MEM_CHUNK_SIZE_MB;
- chunkCount--;
- if (chunkCount == 0) {
- firstChunk = lastChunk = null;
- } else {
- MemChunk next = firstChunk.next;
- firstChunk.next = null;
- firstChunk = next;
- }
- }
- return mbReleased;
- }
- }
-
- public void activateMemWrite() {
- if (budgetCtrl.getTotalBudgetMB() > 0) {
- writeActivated = true;
- if (debug)
- logger.debug(GTMemDiskStore.this + " mem write activated");
- }
- }
-
- public void deactivateMemWrite() {
- writeActivated = false;
- if (debug)
- logger.debug(GTMemDiskStore.this + " mem write de-activated");
- }
-
- public void clear() {
- chunkCount = 0;
- firstChunk = lastChunk = null;
- budgetCtrl.reserve(this, 0);
- }
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (asyncFlushException != null)
- throwAsyncException(asyncFlushException);
- }
- try {
- asyncFlusher.join();
- } catch (NullPointerException npe) {
- // that's fine, async flusher may not present
- } catch (InterruptedException e) {
- logger.warn("async join interrupted", e);
- }
- synchronized (lock) {
- if (asyncFlushException != null)
- throwAsyncException(asyncFlushException);
-
- clear();
- }
- }
-
- private void throwAsyncException(Throwable ex) throws IOException {
- if (ex instanceof IOException)
- throw (IOException) ex;
- else
- throw new IOException(ex);
- }
-
- @Override
- public String toString() {
- return GTMemDiskStore.this.toString();
- }
-
- }
-
- private class DiskPart implements Closeable {
- final File diskFile;
- FileChannel writeChannel;
- FileChannel readChannel;
- int readerCount = 0; // allow parallel readers
- long tailOffset;
-
- DiskPart(File diskFile) throws IOException {
- this.diskFile = diskFile;
- this.tailOffset = diskFile.length();
- if (debug)
- logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
- }
-
- public void openRead() throws IOException {
- if (readChannel == null) {
- readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
- }
- readerCount++;
- }
-
- public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
- }
-
- public void closeRead() throws IOException {
- closeRead(false);
- }
-
- private void closeRead(boolean force) throws IOException {
- readerCount--;
- if (readerCount == 0 || force) {
- if (readChannel != null) {
- readChannel.close();
- readChannel = null;
- }
- }
- }
-
- public void openWrite(boolean append) throws IOException {
- if (append) {
- writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
- tailOffset = diskFile.length();
- } else {
- diskFile.delete();
- writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
- tailOffset = 0;
- }
- }
-
- public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- synchronized (lock) {
- int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
- tailOffset = Math.max(diskOffset + n, tailOffset);
- return n;
- }
- }
-
- public void closeWrite() throws IOException {
- if (writeChannel != null) {
- writeChannel.close();
- writeChannel = null;
- }
- }
-
- public void clear() throws IOException {
- diskFile.delete();
- tailOffset = 0;
- }
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- closeWrite();
- closeRead(true);
- if (delOnClose) {
- diskFile.delete();
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
deleted file mode 100644
index c4efa75..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
- private static final boolean debug = true;
-
- public static interface MemoryConsumer {
- // return number MB released
- int freeUp(int mb);
- }
-
- @SuppressWarnings("serial")
- public static class NotEnoughBudgetException extends IllegalStateException {
-
- public NotEnoughBudgetException() {
- super();
- }
-
- public NotEnoughBudgetException(Throwable cause) {
- super(cause);
- }
- }
-
- private static class ConsumerEntry {
- final MemoryConsumer consumer;
- int reservedMB;
-
- ConsumerEntry(MemoryConsumer consumer) {
- this.consumer = consumer;
- }
- }
-
- public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
- public static final int ONE_MB = 1024 * 1024;
-
- private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
- // all budget numbers are in MB
- private final int totalBudgetMB;
- private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
- private int totalReservedMB;
- private final ReentrantLock lock = new ReentrantLock();
-
- public MemoryBudgetController(int totalBudgetMB) {
- if (totalBudgetMB < 0)
- throw new IllegalArgumentException();
- if (totalBudgetMB > getSystemAvailMB())
- throw new IllegalStateException();
-
- this.totalBudgetMB = totalBudgetMB;
- this.totalReservedMB = 0;
- }
-
- public int getTotalBudgetMB() {
- return totalBudgetMB;
- }
-
- public int getTotalReservedMB() {
- lock.lock();
- try {
- return totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public int getRemainingBudgetMB() {
- lock.lock();
- try {
- return totalBudgetMB - totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public void reserveInsist(MemoryConsumer consumer, int requestMB) {
- long waitStart = 0;
- while (true) {
- try {
- reserve(consumer, requestMB);
- if (debug && waitStart > 0)
- logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
- return;
- } catch (NotEnoughBudgetException ex) {
- // retry
- }
-
- if (waitStart == 0)
- waitStart = System.currentTimeMillis();
-
- synchronized (lock) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- throw new NotEnoughBudgetException(e);
- }
- }
- }
- }
-
- /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
- public void reserve(MemoryConsumer consumer, int requestMB) {
- if (totalBudgetMB == 0 && requestMB > 0)
- throw new NotEnoughBudgetException();
-
- boolean ok = false;
- while (!ok) {
- int gap = calculateGap(consumer, requestMB);
- if (gap > 0) {
- // to void deadlock, don't hold lock when invoking consumer.freeUp()
- tryFreeUp(gap);
- }
- ok = updateBooking(consumer, requestMB);
- }
- }
-
- private int calculateGap(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- int curMB = entry == null ? 0 : entry.reservedMB;
- int delta = requestMB - curMB;
- return delta - (totalBudgetMB - totalReservedMB);
- } finally {
- lock.unlock();
- }
- }
-
- private void tryFreeUp(int gap) {
- // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
- for (ConsumerEntry entry : booking.values()) {
- int mb = entry.consumer.freeUp(gap);
- if (mb > 0) {
- lock.lock();
- try {
- updateBookingWithDelta(entry.consumer, -mb);
- } finally {
- lock.unlock();
- }
- gap -= mb;
- if (gap <= 0)
- break;
- }
- }
- if (gap > 0)
- throw new NotEnoughBudgetException();
-
- if (debug) {
- if (getSystemAvailMB() < getRemainingBudgetMB()) {
- logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
- }
- }
- }
-
- private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (requestMB == 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- int delta = requestMB - entry.reservedMB;
- return updateBookingWithDelta(consumer, delta);
- } finally {
- lock.unlock();
- }
- }
-
- // lock MUST be obtained before entering
- private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
- if (delta == 0)
- return true;
-
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (delta <= 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- // double check gap again, it may be changed by other concurrent requests
- if (delta > 0) {
- int gap = delta - (totalBudgetMB - totalReservedMB);
- if (gap > 0)
- return false;
- }
-
- totalReservedMB += delta;
- entry.reservedMB += delta;
- if (entry.reservedMB == 0) {
- booking.remove(entry.consumer);
- }
- if (debug) {
- logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
- }
-
- if (delta < 0) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
-
- return true;
- }
-
- public static long getSystemAvailBytes() {
- Runtime runtime = Runtime.getRuntime();
- long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
- long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
- long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
- long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
- long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
- return availableMemory;
- }
-
- public static int getSystemAvailMB() {
- return (int) (getSystemAvailBytes() / ONE_MB);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
index 2cafde0..6520795 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.storage.gridtable;
import java.math.BigDecimal;
@@ -15,10 +32,8 @@ import org.apache.kylin.metadata.measure.DoubleSumAggregator;
import org.apache.kylin.metadata.measure.HLLCAggregator;
import org.apache.kylin.metadata.measure.LongSumAggregator;
import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
import org.junit.Test;
-/** Note: Execute each test alone to get accurate size estimate. */
public class AggregationCacheMemSizeTest {
public static final int NUM_OF_OBJS = 1000000 / 2;
@@ -182,7 +197,17 @@ public class AggregationCacheMemSizeTest {
private long memLeft() throws InterruptedException {
Runtime.getRuntime().gc();
Thread.sleep(500);
- return MemoryBudgetController.getSystemAvailBytes();
+ return getSystemAvailBytes();
+ }
+
+ private long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index d9d1eff..0dec3c3 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.storage.gridtable;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
deleted file mode 100644
index 4441687..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
-import org.junit.Test;
-
-public class GTMemDiskStoreTest {
-
- final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
- final GTInfo info = SimpleGridTableTest.advancedInfo();
- final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
-
- @Test
- public void testSingleThreadWriteRead() throws IOException {
- long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead();
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- @Test
- public void testMultiThreadWriteRead() throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
-
- int nThreads = 5;
- Thread[] t = new Thread[nThreads];
- for (int i = 0; i < nThreads; i++) {
- t[i] = new Thread() {
- public void run() {
- try {
- verifyOneTableWriteAndRead();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- };
- t[i].start();
- }
- for (int i = 0; i < nThreads; i++) {
- t[i].join();
- }
-
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- private void verifyOneTableWriteAndRead() throws IOException {
- GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
- GridTable table = new GridTable(info, store);
- verifyWriteAndRead(table);
- }
-
- private void verifyWriteAndRead(GridTable table) throws IOException {
- GTInfo info = table.getInfo();
-
- GTBuilder builder = table.rebuild();
- for (GTRecord r : data) {
- builder.write(r);
- }
- builder.close();
-
- IGTScanner scanner = table.scan(new GTScanRequest(info));
- int i = 0;
- for (GTRecord r : scanner) {
- assertEquals(data.get(i++), r);
- }
- scanner.close();
- }
-}