You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/02 01:31:03 UTC
incubator-kylin git commit: KYLIN-806,
ConcurrentDiskStore that allows concurrent readers,
no obvious improvement compre to MemDiskStore. (ConcurrentDiskStoreTest)
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 4739e0c4a -> d27319b58
KYLIN-806, ConcurrentDiskStore that allows concurrent readers, no obvious
improvement compre to MemDiskStore. (ConcurrentDiskStoreTest)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d27319b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d27319b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d27319b5
Branch: refs/heads/0.8.0
Commit: d27319b587f5b9c6e7dc1599be5f98217375c0c9
Parents: 4739e0c
Author: Yang Li <li...@apache.org>
Authored: Tue Jun 2 07:16:19 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Jun 2 07:29:06 2015 +0800
----------------------------------------------------------------------
.../job/inmemcubing/ConcurrentDiskStore.java | 324 ++++++++++++++++++-
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 16 +-
.../kylin/job/inmemcubing/MemDiskStore.java | 2 +-
.../inmemcubing/ConcurrentDiskStoreTest.java | 93 ++++++
.../job/inmemcubing/GTMemDiskStoreTest.java | 90 ------
.../kylin/job/inmemcubing/MemDiskStoreTest.java | 96 ++++++
.../storage/gridtable/UnitTestSupport.java | 101 ++++++
.../storage/gridtable/SimpleGridTableTest.java | 78 +----
.../gridtable/SimpleInvertedIndexTest.java | 2 +-
9 files changed, 632 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
index cca1040..1ba7e4d 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
@@ -17,6 +17,326 @@
package org.apache.kylin.job.inmemcubing;
-public class ConcurrentDiskStore {
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
-}
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A disk store that allows concurrent read and exclusive write.
+ */
+public class ConcurrentDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+ private static final boolean debug = true;
+
+ private static final int STREAM_BUFFER_SIZE = 8192;
+
+ final private GTInfo info;
+ final private Object lock;
+
+ final private File diskFile;
+ final private boolean delOnClose;
+
+ private Writer activeWriter;
+ private HashSet<Reader> activeReaders = new HashSet<Reader>();
+ private FileChannel writeChannel;
+ private FileChannel readChannel; // sharable across multi-threads
+
+ public ConcurrentDiskStore(GTInfo info) throws IOException {
+ this(info, File.createTempFile("ConcurrentDiskStore", ""), true);
+ }
+
+ public ConcurrentDiskStore(GTInfo info, File diskFile) throws IOException {
+ this(info, diskFile, false);
+ }
+
+ private ConcurrentDiskStore(GTInfo info, File diskFile, boolean delOnClose) throws IOException {
+ this.info = info;
+ this.lock = this;
+ this.diskFile = diskFile;
+ this.delOnClose = delOnClose;
+
+ // in case user forget to call close()
+ if (delOnClose)
+ diskFile.deleteOnExit();
+
+ if (debug)
+ logger.debug(this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return newWriter(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ throw new IllegalStateException("does not support append yet");
+ //return newWriter(diskFile.length());
+ }
+
+ private IGTStoreWriter newWriter(long startOffset) throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null || !activeReaders.isEmpty())
+ throw new IllegalStateException();
+
+ openWriteChannel(startOffset);
+ activeWriter = new Writer(startOffset);
+ return activeWriter;
+ }
+ }
+
+ private void closeWriter(Writer w) {
+ synchronized (lock) {
+ if (activeWriter != w)
+ throw new IllegalStateException();
+
+ activeWriter = null;
+ closeWriteChannel();
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return newReader();
+ }
+
+ private IGTStoreScanner newReader() throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null)
+ throw new IllegalStateException();
+
+ openReadChannel();
+ Reader r = new Reader(0);
+ activeReaders.add(r);
+ return r;
+ }
+ }
+
+ private void closeReader(Reader r) throws IOException {
+ synchronized (lock) {
+ if (activeReaders.contains(r) == false)
+ throw new IllegalStateException();
+
+ activeReaders.remove(r);
+ if (activeReaders.isEmpty())
+ closeReadChannel();
+ }
+ }
+
+ private class Reader implements IGTStoreScanner {
+ final DataInputStream din;
+ long fileLen;
+ long readOffset;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader(long startOffset) throws IOException {
+ this.fileLen = diskFile.length();
+ this.readOffset = startOffset;
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " read start @ " + readOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (available() <= 0)
+ return -1;
+
+ int lenToGo = Math.min(available(), len);
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n = readChannel.read(ByteBuffer.wrap(b, off, lenToGo), readOffset);
+
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ readOffset += n;
+ }
+ return nRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (fileLen - readOffset);
+ }
+ };
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ closeReader(this);
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " read end @ " + readOffset);
+ }
+ }
+
+ private class Writer implements IGTStoreWriter {
+ final DataOutputStream dout;
+ long writeOffset;
+
+ Writer(long startOffset) {
+ this.writeOffset = startOffset;
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " write start @ " + writeOffset);
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ while (length > 0) {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), writeOffset);
+ offset += n;
+ length -= n;
+ writeOffset += n;
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dout.close();
+ closeWriter(this);
+
+ if (debug)
+ logger.debug(ConcurrentDiskStore.this + " write end @ " + writeOffset);
+ }
+ }
+
+ private void openWriteChannel(long startOffset) throws IOException {
+ if (startOffset > 0) { // TODO does not support append yet
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ }
+ }
+
+ private void closeWriteChannel() {
+ IOUtils.closeQuietly(writeChannel);
+ writeChannel = null;
+ }
+
+ private void openReadChannel() throws IOException {
+ if (readChannel == null) {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ }
+ }
+
+ private void closeReadChannel() throws IOException {
+ IOUtils.closeQuietly(readChannel);
+ readChannel = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (activeWriter != null || !activeReaders.isEmpty())
+ throw new IllegalStateException();
+
+ if (delOnClose) {
+ diskFile.delete();
+ }
+
+ if (debug)
+ logger.debug(this + " closed");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ConcurrentDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 1207466..9e98976 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kylin.job.inmemcubing;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -55,6 +56,7 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.GridTable;
import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.IGTStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,9 +158,12 @@ public class InMemCubeBuilder implements Runnable {
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
- // could use a real budget controller, but experiment shows write directly to disk is the same fast
- // GTMemDiskStore store = new GTMemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
- MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+
+ // Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+ // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+ // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+ ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+
GridTable gridTable = new GridTable(info, store);
return gridTable;
}
@@ -566,7 +571,10 @@ public class InMemCubeBuilder implements Runnable {
}
private void closeStore(GridTable gt) throws IOException {
- ((MemDiskStore) gt.getStore()).close();
+ IGTStore store = gt.getStore();
+ if (store instanceof Closeable) {
+ ((Closeable) store).close();
+ }
}
// ===========================================================================
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
index 44376e5..4e6894e 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -61,7 +61,7 @@ public class MemDiskStore implements IGTStore, Closeable {
private Writer ongoingWriter;
public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
- this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+ this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true);
}
public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
new file mode 100644
index 0000000..7aa4640
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class ConcurrentDiskStoreTest {
+
+ final GTInfo info = UnitTestSupport.advancedInfo();
+ final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead(1);
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead(5);
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
+ ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table, readThreads);
+ }
+
+ private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ int nThreads = readThreads;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ ((ConcurrentDiskStore) table.getStore()).close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
deleted file mode 100644
index 9d044a0..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import org.apache.kylin.storage.gridtable.*;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class GTMemDiskStoreTest {
-
- final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
- final GTInfo info = SimpleGridTableTest.advancedInfo();
- final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
-
- @Test
- public void testSingleThreadWriteRead() throws IOException {
- long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead();
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- @Test
- public void testMultiThreadWriteRead() throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
-
- int nThreads = 5;
- Thread[] t = new Thread[nThreads];
- for (int i = 0; i < nThreads; i++) {
- t[i] = new Thread() {
- public void run() {
- try {
- verifyOneTableWriteAndRead();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- };
- t[i].start();
- }
- for (int i = 0; i < nThreads; i++) {
- t[i].join();
- }
-
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- private void verifyOneTableWriteAndRead() throws IOException {
- MemDiskStore store = new MemDiskStore(info, budgetCtrl);
- GridTable table = new GridTable(info, store);
- verifyWriteAndRead(table);
- }
-
- private void verifyWriteAndRead(GridTable table) throws IOException {
- GTInfo info = table.getInfo();
-
- GTBuilder builder = table.rebuild();
- for (GTRecord r : data) {
- builder.write(r);
- }
- builder.close();
-
- IGTScanner scanner = table.scan(new GTScanRequest(info));
- int i = 0;
- for (GTRecord r : scanner) {
- assertEquals(data.get(i++), r);
- }
- scanner.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
new file mode 100644
index 0000000..11cbb67
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class MemDiskStoreTest {
+
+ final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+ final GTInfo info = UnitTestSupport.advancedInfo();
+ final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadWriteRead() throws IOException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead();
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+
+ int nThreads = 5;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ verifyOneTableWriteAndRead();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead() throws IOException {
+ MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table);
+ }
+
+ private void verifyWriteAndRead(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ IGTScanner scanner = table.scan(new GTScanRequest(info));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
new file mode 100644
index 0000000..4b872a6
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gridtable;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.storage.gridtable.GTInfo.Builder;
+
+public class UnitTestSupport {
+
+ public static GTInfo basicInfo() {
+ Builder builder = infoBuilder();
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ public static GTInfo advancedInfo() {
+ Builder builder = infoBuilder();
+ builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
+ builder.enableRowBlock(4);
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ private static Builder infoBuilder() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(new GTSampleCodeSystem());
+ builder.setColumns( //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("bigint"), //
+ DataType.getInstance("decimal") //
+ );
+ builder.setPrimaryKey(setOf(0));
+ builder.setColumnPreferIndex(setOf(0));
+ return builder;
+ }
+
+ public static List<GTRecord> mockupData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ }
+ return result;
+ }
+
+ private static String datePlus(String date, int plusDays) {
+ long millis = DateFormat.stringToMillis(date);
+ millis += (1000L * 3600L * 24L) * plusDays;
+ return DateFormat.formatToDateStr(millis);
+ }
+
+ private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+ GTRecord rec = new GTRecord(info);
+ return rec.setValues(date, name, category, amount, price);
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index bcf0f29..73f92c0 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -21,15 +21,11 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.math.BigDecimal;
-import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.storage.gridtable.GTInfo.Builder;
import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
import org.junit.Test;
@@ -37,7 +33,7 @@ public class SimpleGridTableTest {
@Test
public void testBasics() throws IOException {
- GTInfo info = basicInfo();
+ GTInfo info = UnitTestSupport.basicInfo();
GTSimpleMemStore store = new GTSimpleMemStore(info);
GridTable table = new GridTable(info, store);
@@ -49,7 +45,7 @@ public class SimpleGridTableTest {
@Test
public void testAdvanced() throws IOException {
- GTInfo info = advancedInfo();
+ GTInfo info = UnitTestSupport.advancedInfo();
GTSimpleMemStore store = new GTSimpleMemStore(info);
GridTable table = new GridTable(info, store);
@@ -61,7 +57,7 @@ public class SimpleGridTableTest {
@Test
public void testAggregate() throws IOException {
- GTInfo info = advancedInfo();
+ GTInfo info = UnitTestSupport.advancedInfo();
GTSimpleMemStore store = new GTSimpleMemStore(info);
GridTable table = new GridTable(info, store);
@@ -73,7 +69,7 @@ public class SimpleGridTableTest {
@Test
public void testAppend() throws IOException {
- GTInfo info = advancedInfo();
+ GTInfo info = UnitTestSupport.advancedInfo();
GTSimpleMemStore store = new GTSimpleMemStore(info);
GridTable table = new GridTable(info, store);
@@ -137,7 +133,7 @@ public class SimpleGridTableTest {
static GTBuilder rebuild(GridTable table) throws IOException {
GTBuilder builder = table.rebuild();
- for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+ for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
builder.write(rec);
}
builder.close();
@@ -147,41 +143,8 @@ public class SimpleGridTableTest {
return builder;
}
- public static List<GTRecord> mockupData(GTInfo info, int nRows) {
- List<GTRecord> result = new ArrayList<GTRecord>(nRows);
- int round = nRows / 10;
- for (int i = 0; i < round; i++) {
- String d_01_14 = datePlus("2015-01-14", i * 4);
- String d_01_15 = datePlus("2015-01-15", i * 4);
- String d_01_16 = datePlus("2015-01-16", i * 4);
- String d_01_17 = datePlus("2015-01-17", i * 4);
- result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
- }
- return result;
- }
-
- private static String datePlus(String date, int plusDays) {
- long millis = DateFormat.stringToMillis(date);
- millis += (1000L * 3600L * 24L) * plusDays;
- return DateFormat.formatToDateStr(millis);
- }
-
- private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
- GTRecord rec = new GTRecord(info);
- return rec.setValues(date, name, category, amount, price);
- }
-
static void rebuildViaAppend(GridTable table) throws IOException {
- List<GTRecord> data = mockupData(table.getInfo(), 10);
+ List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
GTBuilder builder;
int i = 0;
@@ -216,35 +179,6 @@ public class SimpleGridTableTest {
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
}
- public static GTInfo basicInfo() {
- Builder builder = infoBuilder();
- GTInfo info = builder.build();
- return info;
- }
-
- public static GTInfo advancedInfo() {
- Builder builder = infoBuilder();
- builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
- builder.enableRowBlock(4);
- GTInfo info = builder.build();
- return info;
- }
-
- private static Builder infoBuilder() {
- Builder builder = GTInfo.builder();
- builder.setCodeSystem(new GTSampleCodeSystem());
- builder.setColumns( //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("bigint"), //
- DataType.getInstance("decimal") //
- );
- builder.setPrimaryKey(setOf(0));
- builder.setColumnPreferIndex(setOf(0));
- return builder;
- }
-
private static ImmutableBitSet setOf(int... values) {
BitSet set = new BitSet();
for (int i : values)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index 6cfb32f..ee08f2a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -48,7 +48,7 @@ public class SimpleInvertedIndexTest {
public SimpleInvertedIndexTest() {
- info = SimpleGridTableTest.advancedInfo();
+ info = UnitTestSupport.advancedInfo();
TblColRef colA = info.colRef(0);
// block i contains value "i", the last is NULL