You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/27 14:29:36 UTC

incubator-kylin git commit: MemDiskStore write function done

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-786 [created] 7945a1e49


MemDiskStore write function done


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7945a1e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7945a1e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7945a1e4

Branch: refs/heads/KYLIN-786
Commit: 7945a1e494dd899ab621f4e8082cf47ff7de4456
Parents: 9efb473
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 20:29:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed May 27 20:29:02 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 136 +++++++
 .../common/util/MemoryBudgetControllerTest.java |  60 ++++
 .../gridtable/memstore/MemDiskStore.java        | 357 +++++++++++++++++++
 3 files changed, 553 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..dab5607
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,136 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+    public static interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+    
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final int ONE_MB = 1024 * 1024;
+    public static final int SYSTEM_RESERVED = 200;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+
+    private int totalReservedMB;
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        if (totalBudgetMB <= 0)
+            throw new IllegalArgumentException();
+        if (checkSystemAvailMB(totalBudgetMB) == false)
+            throw new IllegalStateException();
+
+        this.totalBudgetMB = totalBudgetMB;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        return totalReservedMB;
+    }
+
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
+            entry = booking.get(consumer);
+        }
+
+        int delta = requestMB - entry.reservedMB;
+
+        if (delta > 0) {
+            checkFreeMemoryAndUpdateBooking(entry, delta);
+        } else {
+            updateBooking(entry, delta);
+        }
+    }
+
+    synchronized private void updateBooking(ConsumerEntry entry, int delta) {
+        totalReservedMB += delta;
+        entry.reservedMB -= delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (delta < 0) {
+            this.notify();
+        }
+    }
+
+    synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+        while (true) {
+            // if budget is not enough, try free up 
+            if (delta > totalBudgetMB - totalReservedMB) {
+                int freeUpToGo = delta;
+                for (ConsumerEntry entry : booking.values()) {
+                    if (entry.consumer != consumer.consumer) {
+                        int mb = entry.consumer.freeUp(freeUpToGo);
+                        updateBooking(entry, -mb);
+                        freeUpToGo -= mb;
+                        if (freeUpToGo <= 0)
+                            break;
+                    }
+                }
+                if (freeUpToGo > 0)
+                    throw new NotEnoughBudgetException();
+
+                Runtime.getRuntime().gc();
+            }
+
+            if (checkSystemAvailMB(delta))
+                break;
+
+            try {
+                logger.debug("Memory budget has " + (totalBudgetMB - totalReservedMB) + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+                this.wait(200);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted while wait free memory", e);
+            }
+        }
+
+        updateBooking(consumer, delta);
+    }
+
+    private boolean checkSystemAvailMB(int mb) {
+        return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+    }
+
+    public static int getSystemAvailMB() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        int availMB = (int) (availableMemory / ONE_MB);
+        return availMB;
+    }
+
+    public static int getMaxPossibleBudget() {
+        return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..bc2eadd
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+    @Test
+    public void test() {
+        int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+        MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+        ArrayList<OneMB> mbList = new ArrayList<OneMB>();
+        for (int i = 0; i < n; i++) {
+            mbList.add(new OneMB(mbc));
+            assertEquals(mbList.size(), mbc.getTotalReservedMB());
+        }
+
+        mbc.reserve(new OneMB(), n);
+
+        for (int i = 0; i < n; i++) {
+            assertEquals(null, mbList.get(i).data);
+        }
+
+        try {
+            mbc.reserve(new OneMB(), 1);
+            fail();
+        } catch (IllegalStateException ex) {
+            // expected
+        }
+    }
+
+    class OneMB implements MemoryBudgetController.MemoryConsumer {
+
+        byte[] data;
+
+        OneMB() {
+        }
+
+        OneMB(MemoryBudgetController mbc) {
+            mbc.reserve(this, 1);
+            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            if (data != null) {
+                data = null;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7945a1e4/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
new file mode 100644
index 0000000..b5a58ca
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
@@ -0,0 +1,357 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+
+    final GTInfo info;
+    final MemPart memPart;
+    final DiskPart diskPart;
+
+    public MemDiskStore(GTInfo info, File diskFile, MemoryBudgetController budgetCtrl) throws IOException {
+        this.info = info;
+        this.memPart = new MemPart(budgetCtrl);
+        this.diskPart = new DiskPart(diskFile);
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return new Writer(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return new Writer(Math.max(memPart.tailOffset(), diskPart.tailOffset));
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    private class Writer implements IGTStoreWriter {
+
+        final DataOutputStream dout;
+        boolean memPartActivated = true;
+
+        Writer(final long disktOffset) throws IOException {
+            memPart.clear();
+            diskPart.clear();
+
+            memPart.activateWrite();
+
+            OutputStream out = new OutputStream() {
+                long diskOffset = disktOffset;
+                byte[] tmp = new byte[1];
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    while (length > 0) {
+                        if (memPartActivated) {
+                            int n = memPart.write(bytes, offset, length, diskOffset);
+                            offset += n;
+                            length -= n;
+                            diskOffset += n;
+                            if (n == 0) {
+                                memPartActivated = false;
+                            }
+                        } else {
+                            diskPart.write(diskOffset, bytes, offset, length);
+                            diskOffset += length;
+                        }
+                    }
+                }
+            };
+            dout = new DataOutputStream(out);
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+
+        @Override
+        public void close() throws IOException {
+            memPart.finishAsyncFlush();
+        }
+    }
+
+    private static class MemChunk {
+        long diskOffset;
+        int length;
+        byte[] data;
+        MemChunk next;
+
+        boolean isFull() {
+            return length == data.length;
+        }
+
+        long headOffset() {
+            return diskOffset;
+        }
+
+        long tailOffset() {
+            return diskOffset + length;
+        }
+
+        int freeSpace() {
+            return data.length - length;
+        }
+    }
+
+    private class MemPart implements Closeable, MemoryConsumer {
+
+        final MemoryBudgetController budgetCtrl;
+
+        // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+        volatile boolean writeActivated;
+        MemChunk firstChunk;
+        MemChunk lastChunk;
+        int chunkCount;
+
+        Thread asyncFlusher;
+        MemChunk asyncFlushChunk;
+        long asyncFlushDiskOffset;
+        Throwable asyncFlushException;
+
+        MemPart(MemoryBudgetController budgetCtrl) {
+            this.budgetCtrl = budgetCtrl;
+        }
+
+        @SuppressWarnings("unused")
+        long headOffset() {
+            return firstChunk == null ? 0 : firstChunk.headOffset();
+        }
+
+        long tailOffset() {
+            return lastChunk == null ? 0 : lastChunk.tailOffset();
+        }
+
+        synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            if (writeActivated == false)
+                return 0;
+
+            // write is only expected at the tail
+            if (diskOffset != tailOffset())
+                return 0;
+
+            if (chunkCount == 0 || lastChunk.isFull()) {
+                allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+                if (writeActivated == false)
+                    return 0;
+            }
+
+            int n = Math.min(lastChunk.freeSpace(), length);
+            System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+            lastChunk.length += n;
+
+            asyncFlush(lastChunk, diskOffset, n);
+
+            return n;
+        }
+
+        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+            if (asyncFlusher == null) {
+                asyncFlusher = new Thread() {
+                    public void run() {
+                        try {
+                            while (writeActivated) {
+                                synchronized (asyncFlusher) {
+                                    try {
+                                        asyncFlusher.wait();
+                                    } catch (InterruptedException e) {
+                                    }
+                                }
+                                flushToDiskPart();
+                            }
+                            flushToDiskPart();
+                        } catch (Throwable ex) {
+                            asyncFlushException = ex;
+                        }
+                    }
+                };
+                asyncFlusher.start();
+            }
+
+            if (asyncFlushChunk == null) {
+                asyncFlushChunk = lastChunk;
+                asyncFlushDiskOffset = diskOffset;
+            }
+
+            // flush in batch
+            if (diskOffset + n >= asyncFlushDiskOffset + 4096) {
+                synchronized (asyncFlusher) {
+                    asyncFlusher.notify();
+                }
+            }
+        }
+
+        private void flushToDiskPart() throws IOException {
+            byte[] data;
+            int offset = 0;
+            int length = 0;
+
+            while (true) {
+                data = null;
+                synchronized (memPart) {
+                    asyncFlushDiskOffset += length; // bytes written in last loop
+                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+                        asyncFlushChunk = asyncFlushChunk.next;
+                    }
+                    if (asyncFlushChunk != null) {
+                        data = asyncFlushChunk.data;
+                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+                        length = asyncFlushChunk.length - offset;
+                    }
+                }
+
+                if (data == null)
+                    break;
+
+                diskPart.write(asyncFlushDiskOffset, data, offset, length);
+            }
+        }
+
+        private void allocateNewMemChunk(long diskOffset) {
+            try {
+                budgetCtrl.reserve(this, chunkCount + 1);
+
+                MemChunk chunk = new MemChunk();
+                chunk.diskOffset = diskOffset;
+                chunk.data = new byte[ONE_MB - 48]; // -48 for MemChunk overhead
+                if (chunkCount == 0) {
+                    firstChunk = lastChunk = chunk;
+                } else {
+                    lastChunk.next = chunk;
+                    lastChunk = chunk;
+                }
+                chunkCount++;
+
+            } catch (NotEnoughBudgetException ex) {
+                deactivateWrite();
+            }
+        }
+        
+        synchronized public void finishAsyncFlush() throws IOException {
+            deactivateWrite();
+            if (asyncFlusher != null) {
+                synchronized (asyncFlusher) {
+                    asyncFlusher.notify();
+                }
+
+                try {
+                    asyncFlusher.join();
+                } catch (InterruptedException e) {
+                    logger.warn("", e);
+                }
+                asyncFlusher = null;
+
+                if (asyncFlushException != null) {
+                    if (asyncFlushException instanceof IOException)
+                        throw (IOException) asyncFlushException;
+                    else
+                        throw new IOException(asyncFlushException);
+                }
+            }
+        }
+
+        @Override
+        synchronized public int freeUp(int mb) {
+            int mbReleased = 0;
+            while (chunkCount > 0 && mbReleased < mb) {
+                if (firstChunk == asyncFlushChunk)
+                    break;
+
+                mbReleased++;
+                chunkCount--;
+                if (chunkCount == 0)
+                    firstChunk = lastChunk = null;
+                else
+                    firstChunk = firstChunk.next;
+            }
+            return mbReleased;
+        }
+        
+        void activateWrite() {
+            writeActivated = true;
+        }
+
+        void deactivateWrite() {
+            writeActivated = false;
+        }
+        
+        synchronized public void clear() {
+            chunkCount = 0;
+            firstChunk = lastChunk = null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            finishAsyncFlush();
+            clear();
+        }
+
+    }
+
+    private class DiskPart implements Closeable {
+        final File diskFile;
+        final FileChannel channel;
+        long tailOffset;
+
+        DiskPart(File diskFile) throws IOException {
+            this.diskFile = diskFile;
+            this.channel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE, StandardOpenOption.READ);
+        }
+
+        void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+            tailOffset += length;
+        }
+
+        void clear() throws IOException {
+            tailOffset = 0;
+            channel.truncate(0);
+        }
+
+        @Override
+        public void close() throws IOException {
+            channel.close();
+        }
+    }
+
+}