You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 09:35:39 UTC

incubator-kylin git commit: KYLIN-860 always allow memory hungry distinct count in coprocessor

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 7389f89e1 -> c1f252495


KYLIN-860 always allow memory hungry distinct count in coprocessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c1f25249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c1f25249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c1f25249

Branch: refs/heads/0.8
Commit: c1f25249555787f9d481056286f40cadbd5c13e2
Parents: 7389f89
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:28:40 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:29:18 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 250 +++++++++++++++++++
 .../job/inmemcubing/DoggedCubeBuilder.java      |  21 +-
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |  29 +--
 .../kylin/job/inmemcubing/MemDiskStore.java     |   7 +-
 .../job/inmemcubing/MemoryBudgetController.java | 250 -------------------
 .../kylin/job/inmemcubing/MemDiskStoreTest.java |   1 +
 .../inmemcubing/MemoryBudgetControllerTest.java |   8 +-
 .../kylin/storage/hbase/CubeStorageEngine.java  |  24 +-
 .../hbase/coprocessor/AggregationCache.java     |  19 +-
 .../coprocessor/observer/ObserverEnabler.java   |  33 +--
 10 files changed, 313 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..b4394ae
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,250 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public static interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        if (totalBudgetMB < 0)
+            throw new IllegalArgumentException();
+        if (totalBudgetMB > getSystemAvailMB())
+            throw new IllegalStateException();
+
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index b0c3d5c..4fc3be2 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -16,21 +16,10 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
@@ -44,7 +33,11 @@ import org.apache.kylin.storage.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 3c3d834..7576b04 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,22 +16,13 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -45,18 +36,14 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.GTAggregateScanner;
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
index 4e6894e..badb14f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -17,7 +17,7 @@
 
 package org.apache.kylin.job.inmemcubing;
 
-import static org.apache.kylin.job.inmemcubing.MemoryBudgetController.*;
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -34,8 +34,9 @@ import java.nio.file.StandardOpenOption;
 import java.util.NoSuchElementException;
 
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
 import org.apache.kylin.storage.gridtable.GTInfo;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTRowBlock;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
deleted file mode 100644
index 4b50160..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
-    private static final boolean debug = true;
-
-    public static interface MemoryConsumer {
-        // return number MB released
-        int freeUp(int mb);
-    }
-
-    @SuppressWarnings("serial")
-    public static class NotEnoughBudgetException extends IllegalStateException {
-
-        public NotEnoughBudgetException() {
-            super();
-        }
-
-        public NotEnoughBudgetException(Throwable cause) {
-            super(cause);
-        }
-    }
-
-    private static class ConsumerEntry {
-        final MemoryConsumer consumer;
-        int reservedMB;
-
-        ConsumerEntry(MemoryConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
-    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
-    public static final int ONE_MB = 1024 * 1024;
-
-    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
-    // all budget numbers are in MB
-    private final int totalBudgetMB;
-    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-    private int totalReservedMB;
-    private final ReentrantLock lock = new ReentrantLock();
-
-    public MemoryBudgetController(int totalBudgetMB) {
-        if (totalBudgetMB < 0)
-            throw new IllegalArgumentException();
-        if (totalBudgetMB > getSystemAvailMB())
-            throw new IllegalStateException();
-
-        this.totalBudgetMB = totalBudgetMB;
-        this.totalReservedMB = 0;
-    }
-
-    public int getTotalBudgetMB() {
-        return totalBudgetMB;
-    }
-
-    public int getTotalReservedMB() {
-        lock.lock();
-        try {
-            return totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getRemainingBudgetMB() {
-        lock.lock();
-        try {
-            return totalBudgetMB - totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
-        long waitStart = 0;
-        while (true) {
-            try {
-                reserve(consumer, requestMB);
-                if (debug && waitStart > 0)
-                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
-                return;
-            } catch (NotEnoughBudgetException ex) {
-                // retry
-            }
-
-            if (waitStart == 0)
-                waitStart = System.currentTimeMillis();
-
-            synchronized (lock) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    throw new NotEnoughBudgetException(e);
-                }
-            }
-        }
-    }
-
-    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
-    public void reserve(MemoryConsumer consumer, int requestMB) {
-        if (totalBudgetMB == 0 && requestMB > 0)
-            throw new NotEnoughBudgetException();
-
-        boolean ok = false;
-        while (!ok) {
-            int gap = calculateGap(consumer, requestMB);
-            if (gap > 0) {
-                // to void deadlock, don't hold lock when invoking consumer.freeUp()
-                tryFreeUp(gap);
-            }
-            ok = updateBooking(consumer, requestMB);
-        }
-    }
-
-    private int calculateGap(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            int curMB = entry == null ? 0 : entry.reservedMB;
-            int delta = requestMB - curMB;
-            return delta - (totalBudgetMB - totalReservedMB);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void tryFreeUp(int gap) {
-        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
-        for (ConsumerEntry entry : booking.values()) {
-            int mb = entry.consumer.freeUp(gap);
-            if (mb > 0) {
-                lock.lock();
-                try {
-                    updateBookingWithDelta(entry.consumer, -mb);
-                } finally {
-                    lock.unlock();
-                }
-                gap -= mb;
-                if (gap <= 0)
-                    break;
-            }
-        }
-        if (gap > 0)
-            throw new NotEnoughBudgetException();
-
-        if (debug) {
-            if (getSystemAvailMB() < getRemainingBudgetMB()) {
-                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
-            }
-        }
-    }
-
-    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            if (entry == null) {
-                if (requestMB == 0)
-                    return true;
-
-                entry = new ConsumerEntry(consumer);
-                booking.put(consumer, entry);
-            }
-
-            int delta = requestMB - entry.reservedMB;
-            return updateBookingWithDelta(consumer, delta);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // lock MUST be obtained before entering
-    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
-        if (delta == 0)
-            return true;
-
-        ConsumerEntry entry = booking.get(consumer);
-        if (entry == null) {
-            if (delta <= 0)
-                return true;
-
-            entry = new ConsumerEntry(consumer);
-            booking.put(consumer, entry);
-        }
-
-        // double check gap again, it may be changed by other concurrent requests
-        if (delta > 0) {
-            int gap = delta - (totalBudgetMB - totalReservedMB);
-            if (gap > 0)
-                return false;
-        }
-
-        totalReservedMB += delta;
-        entry.reservedMB += delta;
-        if (entry.reservedMB == 0) {
-            booking.remove(entry.consumer);
-        }
-        if (debug) {
-            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
-        }
-
-        if (delta < 0) {
-            synchronized (lock) {
-                lock.notifyAll();
-            }
-        }
-
-        return true;
-    }
-
-    public static long getSystemAvailBytes() {
-        Runtime runtime = Runtime.getRuntime();
-        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
-        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
-        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
-        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
-        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
-        return availableMemory;
-    }
-
-    public static int getSystemAvailMB() {
-        return (int) (getSystemAvailBytes() / ONE_MB);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
index 11cbb67..fd848f2 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.storage.gridtable.GTBuilder;
 import org.apache.kylin.storage.gridtable.GTInfo;
 import org.apache.kylin.storage.gridtable.GTRecord;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
index e6a1fb4..6f86c95 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.kylin.job.inmemcubing;
 
-import static org.junit.Assert.*;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
 
 import java.util.ArrayList;
 
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
+import static org.junit.Assert.*;
 
 public class MemoryBudgetControllerTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index d5132bb..fdca42b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -18,18 +18,10 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -63,10 +55,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
 
 /**
  * @author xjiang, yangli9
@@ -138,7 +127,8 @@ public class CubeStorageEngine implements ICachableStorageEngine {
         // check involved measures, build value decoder for each each family:column
         List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
 
-        setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 2553dfd..d540084 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -18,17 +18,18 @@
 
 package org.apache.kylin.storage.hbase.coprocessor;
 
-import java.util.Map;
-
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 
-import com.google.common.collect.Maps;
+import java.util.Map;
 
 /**
  */
 @SuppressWarnings("rawtypes")
 public abstract class AggregationCache {
-    static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+    static final long MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+    static final long MEMOERY_MAX_BYTES = Runtime.getRuntime().maxMemory();
     protected final Map<AggrKey, MeasureAggregator[]> aggBufMap;
     transient int rowMemBytes;
     private AggrKey firstKey = null;
@@ -73,9 +74,15 @@ public abstract class AggregationCache {
             }
         }
         int size = aggBufMap.size();
-        int memUsage = (40 + rowMemBytes) * size;
+        long memUsage = (40 + rowMemBytes) * size;
         if (memUsage > MEMORY_USAGE_CAP) {
-            throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+            throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+        }
+
+        //If less than 5% of max memory
+        long avail = MemoryBudgetController.getSystemAvailBytes();
+        if (avail < (MEMOERY_MAX_BYTES / 20)) {
+            throw new RuntimeException("Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 9b3753d..420e4c4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -18,14 +18,8 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -44,10 +38,22 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.RegionScannerAdapter;
 import org.apache.kylin.storage.hbase.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * @author yangli9
@@ -95,7 +101,6 @@ public class ObserverEnabler {
 
     private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
 
-
         String forceFlag = System.getProperty(FORCE_COPROCESSOR);
         if (forceFlag != null) {
             return Boolean.parseBoolean(forceFlag);
@@ -106,10 +111,10 @@ public class ObserverEnabler {
             return cubeOverride.booleanValue();
         }
 
-        if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
-            logger.info("Coprocessor is disabled because there is memory hungry count distinct");
-            return false;
-        }
+        //        if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
+        //            logger.info("Coprocessor is disabled because there is memory hungry count distinct");
+        //            return false;
+        //        }
 
         if (context.isExactAggregation()) {
             logger.info("Coprocessor is disabled because exactAggregation is true");