You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 09:35:39 UTC
incubator-kylin git commit: KYLIN-860 always allow memory hungry
distinct count in coprocessor
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 7389f89e1 -> c1f252495
KYLIN-860 always allow memory hungry distinct count in coprocessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c1f25249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c1f25249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c1f25249
Branch: refs/heads/0.8
Commit: c1f25249555787f9d481056286f40cadbd5c13e2
Parents: 7389f89
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:28:40 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:29:18 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 250 +++++++++++++++++++
.../job/inmemcubing/DoggedCubeBuilder.java | 21 +-
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 29 +--
.../kylin/job/inmemcubing/MemDiskStore.java | 7 +-
.../job/inmemcubing/MemoryBudgetController.java | 250 -------------------
.../kylin/job/inmemcubing/MemDiskStoreTest.java | 1 +
.../inmemcubing/MemoryBudgetControllerTest.java | 8 +-
.../kylin/storage/hbase/CubeStorageEngine.java | 24 +-
.../hbase/coprocessor/AggregationCache.java | 19 +-
.../coprocessor/observer/ObserverEnabler.java | 33 +--
10 files changed, 313 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..b4394ae
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MemoryBudgetController {
+
+ private static final boolean debug = true;
+
+ public static interface MemoryConsumer {
+ // return number MB released
+ int freeUp(int mb);
+ }
+
+ @SuppressWarnings("serial")
+ public static class NotEnoughBudgetException extends IllegalStateException {
+
+ public NotEnoughBudgetException() {
+ super();
+ }
+
+ public NotEnoughBudgetException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static class ConsumerEntry {
+ final MemoryConsumer consumer;
+ int reservedMB;
+
+ ConsumerEntry(MemoryConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+ public static final int ONE_MB = 1024 * 1024;
+
+ private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+ // all budget numbers are in MB
+ private final int totalBudgetMB;
+ private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+ private int totalReservedMB;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public MemoryBudgetController(int totalBudgetMB) {
+ if (totalBudgetMB < 0)
+ throw new IllegalArgumentException();
+ if (totalBudgetMB > getSystemAvailMB())
+ throw new IllegalStateException();
+
+ this.totalBudgetMB = totalBudgetMB;
+ this.totalReservedMB = 0;
+ }
+
+ public int getTotalBudgetMB() {
+ return totalBudgetMB;
+ }
+
+ public int getTotalReservedMB() {
+ lock.lock();
+ try {
+ return totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getRemainingBudgetMB() {
+ lock.lock();
+ try {
+ return totalBudgetMB - totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+ long waitStart = 0;
+ while (true) {
+ try {
+ reserve(consumer, requestMB);
+ if (debug && waitStart > 0)
+ logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+ return;
+ } catch (NotEnoughBudgetException ex) {
+ // retry
+ }
+
+ if (waitStart == 0)
+ waitStart = System.currentTimeMillis();
+
+ synchronized (lock) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ throw new NotEnoughBudgetException(e);
+ }
+ }
+ }
+ }
+
+ /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+ public void reserve(MemoryConsumer consumer, int requestMB) {
+ if (totalBudgetMB == 0 && requestMB > 0)
+ throw new NotEnoughBudgetException();
+
+ boolean ok = false;
+ while (!ok) {
+ int gap = calculateGap(consumer, requestMB);
+ if (gap > 0) {
+ // to void deadlock, don't hold lock when invoking consumer.freeUp()
+ tryFreeUp(gap);
+ }
+ ok = updateBooking(consumer, requestMB);
+ }
+ }
+
+ private int calculateGap(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ int curMB = entry == null ? 0 : entry.reservedMB;
+ int delta = requestMB - curMB;
+ return delta - (totalBudgetMB - totalReservedMB);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void tryFreeUp(int gap) {
+ // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+ for (ConsumerEntry entry : booking.values()) {
+ int mb = entry.consumer.freeUp(gap);
+ if (mb > 0) {
+ lock.lock();
+ try {
+ updateBookingWithDelta(entry.consumer, -mb);
+ } finally {
+ lock.unlock();
+ }
+ gap -= mb;
+ if (gap <= 0)
+ break;
+ }
+ }
+ if (gap > 0)
+ throw new NotEnoughBudgetException();
+
+ if (debug) {
+ if (getSystemAvailMB() < getRemainingBudgetMB()) {
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ }
+ }
+ }
+
+ private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (requestMB == 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ int delta = requestMB - entry.reservedMB;
+ return updateBookingWithDelta(consumer, delta);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // lock MUST be obtained before entering
+ private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+ if (delta == 0)
+ return true;
+
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (delta <= 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ // double check gap again, it may be changed by other concurrent requests
+ if (delta > 0) {
+ int gap = delta - (totalBudgetMB - totalReservedMB);
+ if (gap > 0)
+ return false;
+ }
+
+ totalReservedMB += delta;
+ entry.reservedMB += delta;
+ if (entry.reservedMB == 0) {
+ booking.remove(entry.consumer);
+ }
+ if (debug) {
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ }
+
+ if (delta < 0) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+ return true;
+ }
+
+ public static long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+ public static int getSystemAvailMB() {
+ return (int) (getSystemAvailBytes() / ONE_MB);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index b0c3d5c..4fc3be2 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -16,21 +16,10 @@
*/
package org.apache.kylin.job.inmemcubing;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
@@ -44,7 +33,11 @@ import org.apache.kylin.storage.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 3c3d834..7576b04 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,22 +16,13 @@
*/
package org.apache.kylin.job.inmemcubing;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -45,18 +36,14 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.GTAggregateScanner;
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
index 4e6894e..badb14f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -17,7 +17,7 @@
package org.apache.kylin.job.inmemcubing;
-import static org.apache.kylin.job.inmemcubing.MemoryBudgetController.*;
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -34,8 +34,9 @@ import java.nio.file.StandardOpenOption;
import java.util.NoSuchElementException;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTRowBlock;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
deleted file mode 100644
index 4b50160..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
- private static final boolean debug = true;
-
- public static interface MemoryConsumer {
- // return number MB released
- int freeUp(int mb);
- }
-
- @SuppressWarnings("serial")
- public static class NotEnoughBudgetException extends IllegalStateException {
-
- public NotEnoughBudgetException() {
- super();
- }
-
- public NotEnoughBudgetException(Throwable cause) {
- super(cause);
- }
- }
-
- private static class ConsumerEntry {
- final MemoryConsumer consumer;
- int reservedMB;
-
- ConsumerEntry(MemoryConsumer consumer) {
- this.consumer = consumer;
- }
- }
-
- public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
- public static final int ONE_MB = 1024 * 1024;
-
- private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
- // all budget numbers are in MB
- private final int totalBudgetMB;
- private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
- private int totalReservedMB;
- private final ReentrantLock lock = new ReentrantLock();
-
- public MemoryBudgetController(int totalBudgetMB) {
- if (totalBudgetMB < 0)
- throw new IllegalArgumentException();
- if (totalBudgetMB > getSystemAvailMB())
- throw new IllegalStateException();
-
- this.totalBudgetMB = totalBudgetMB;
- this.totalReservedMB = 0;
- }
-
- public int getTotalBudgetMB() {
- return totalBudgetMB;
- }
-
- public int getTotalReservedMB() {
- lock.lock();
- try {
- return totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public int getRemainingBudgetMB() {
- lock.lock();
- try {
- return totalBudgetMB - totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public void reserveInsist(MemoryConsumer consumer, int requestMB) {
- long waitStart = 0;
- while (true) {
- try {
- reserve(consumer, requestMB);
- if (debug && waitStart > 0)
- logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
- return;
- } catch (NotEnoughBudgetException ex) {
- // retry
- }
-
- if (waitStart == 0)
- waitStart = System.currentTimeMillis();
-
- synchronized (lock) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- throw new NotEnoughBudgetException(e);
- }
- }
- }
- }
-
- /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
- public void reserve(MemoryConsumer consumer, int requestMB) {
- if (totalBudgetMB == 0 && requestMB > 0)
- throw new NotEnoughBudgetException();
-
- boolean ok = false;
- while (!ok) {
- int gap = calculateGap(consumer, requestMB);
- if (gap > 0) {
- // to void deadlock, don't hold lock when invoking consumer.freeUp()
- tryFreeUp(gap);
- }
- ok = updateBooking(consumer, requestMB);
- }
- }
-
- private int calculateGap(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- int curMB = entry == null ? 0 : entry.reservedMB;
- int delta = requestMB - curMB;
- return delta - (totalBudgetMB - totalReservedMB);
- } finally {
- lock.unlock();
- }
- }
-
- private void tryFreeUp(int gap) {
- // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
- for (ConsumerEntry entry : booking.values()) {
- int mb = entry.consumer.freeUp(gap);
- if (mb > 0) {
- lock.lock();
- try {
- updateBookingWithDelta(entry.consumer, -mb);
- } finally {
- lock.unlock();
- }
- gap -= mb;
- if (gap <= 0)
- break;
- }
- }
- if (gap > 0)
- throw new NotEnoughBudgetException();
-
- if (debug) {
- if (getSystemAvailMB() < getRemainingBudgetMB()) {
- logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
- }
- }
- }
-
- private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (requestMB == 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- int delta = requestMB - entry.reservedMB;
- return updateBookingWithDelta(consumer, delta);
- } finally {
- lock.unlock();
- }
- }
-
- // lock MUST be obtained before entering
- private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
- if (delta == 0)
- return true;
-
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (delta <= 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- // double check gap again, it may be changed by other concurrent requests
- if (delta > 0) {
- int gap = delta - (totalBudgetMB - totalReservedMB);
- if (gap > 0)
- return false;
- }
-
- totalReservedMB += delta;
- entry.reservedMB += delta;
- if (entry.reservedMB == 0) {
- booking.remove(entry.consumer);
- }
- if (debug) {
- logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
- }
-
- if (delta < 0) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
-
- return true;
- }
-
- public static long getSystemAvailBytes() {
- Runtime runtime = Runtime.getRuntime();
- long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
- long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
- long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
- long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
- long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
- return availableMemory;
- }
-
- public static int getSystemAvailMB() {
- return (int) (getSystemAvailBytes() / ONE_MB);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
index 11cbb67..fd848f2 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.List;
+import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.storage.gridtable.GTBuilder;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRecord;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
index e6a1fb4..6f86c95 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
@@ -17,13 +17,13 @@
package org.apache.kylin.job.inmemcubing;
-import static org.junit.Assert.*;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
import java.util.ArrayList;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
-import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
+import static org.junit.Assert.*;
public class MemoryBudgetControllerTest {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index d5132bb..fdca42b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -18,18 +18,10 @@
package org.apache.kylin.storage.hbase;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
@@ -63,10 +55,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
/**
* @author xjiang, yangli9
@@ -138,7 +127,8 @@ public class CubeStorageEngine implements ICachableStorageEngine {
// check involved measures, build value decoder for each each family:column
List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
- setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+ //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+ //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 2553dfd..d540084 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -18,17 +18,18 @@
package org.apache.kylin.storage.hbase.coprocessor;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.metadata.measure.MeasureAggregator;
-import com.google.common.collect.Maps;
+import java.util.Map;
/**
*/
@SuppressWarnings("rawtypes")
public abstract class AggregationCache {
- static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+ static final long MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+ static final long MEMOERY_MAX_BYTES = Runtime.getRuntime().maxMemory();
protected final Map<AggrKey, MeasureAggregator[]> aggBufMap;
transient int rowMemBytes;
private AggrKey firstKey = null;
@@ -73,9 +74,15 @@ public abstract class AggregationCache {
}
}
int size = aggBufMap.size();
- int memUsage = (40 + rowMemBytes) * size;
+ long memUsage = (40 + rowMemBytes) * size;
if (memUsage > MEMORY_USAGE_CAP) {
- throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+ throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+ }
+
+ //If less than 5% of max memory
+ long avail = MemoryBudgetController.getSystemAvailBytes();
+ if (avail < (MEMOERY_MAX_BYTES / 20)) {
+ throw new RuntimeException("Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c1f25249/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 9b3753d..420e4c4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -18,14 +18,8 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -44,10 +38,22 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.RegionScannerAdapter;
import org.apache.kylin.storage.hbase.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
/**
* @author yangli9
@@ -95,7 +101,6 @@ public class ObserverEnabler {
private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
-
String forceFlag = System.getProperty(FORCE_COPROCESSOR);
if (forceFlag != null) {
return Boolean.parseBoolean(forceFlag);
@@ -106,10 +111,10 @@ public class ObserverEnabler {
return cubeOverride.booleanValue();
}
- if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
- logger.info("Coprocessor is disabled because there is memory hungry count distinct");
- return false;
- }
+ // if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
+ // logger.info("Coprocessor is disabled because there is memory hungry count distinct");
+ // return false;
+ // }
if (context.isExactAggregation()) {
logger.info("Coprocessor is disabled because exactAggregation is true");