You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/30 07:09:42 UTC
incubator-kylin git commit: Revert "KYLIN-980 spill to disk when sys
available memory is low"
Repository: incubator-kylin
Updated Branches:
refs/heads/1.x-staging 6f93a4d62 -> cfe9f0a8a
Revert "KYLIN-980 spill to disk when sys available memory is low"
This reverts commit 0f8fc239162bbca913b1eceb380d89f674928400.
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cfe9f0a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cfe9f0a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cfe9f0a8
Branch: refs/heads/1.x-staging
Commit: cfe9f0a8ac960b59281aefa0245e79ed4b534294
Parents: 6f93a4d
Author: shaofengshi <sh...@apache.org>
Authored: Mon Nov 30 14:08:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Nov 30 14:08:27 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 249 -------------------
.../hadoop/cube/FactDistinctColumnsReducer.java | 33 +--
2 files changed, 8 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cfe9f0a8/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
deleted file mode 100644
index 4715ef6..0000000
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.util;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class MemoryBudgetController {
-
- private static final boolean debug = true;
-
- public interface MemoryConsumer {
- // return number MB released
- int freeUp(int mb);
- }
-
- @SuppressWarnings("serial")
- public static class NotEnoughBudgetException extends IllegalStateException {
-
- public NotEnoughBudgetException() {
- super();
- }
-
- public NotEnoughBudgetException(Throwable cause) {
- super(cause);
- }
- }
-
- private static class ConsumerEntry {
- final MemoryConsumer consumer;
- int reservedMB;
-
- ConsumerEntry(MemoryConsumer consumer) {
- this.consumer = consumer;
- }
- }
-
- public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
- public static final int ONE_MB = 1024 * 1024;
- public static final long ONE_GB = 1024 * 1024 * 1024;
-
- private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
- // all budget numbers are in MB
- private final int totalBudgetMB;
- private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
- private int totalReservedMB;
- private final ReentrantLock lock = new ReentrantLock();
-
- public MemoryBudgetController(int totalBudgetMB) {
- Preconditions.checkArgument(totalBudgetMB >= 0);
- Preconditions.checkState(totalBudgetMB <= getSystemAvailMB());
- this.totalBudgetMB = totalBudgetMB;
- this.totalReservedMB = 0;
- }
-
- public int getTotalBudgetMB() {
- return totalBudgetMB;
- }
-
- public int getTotalReservedMB() {
- lock.lock();
- try {
- return totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public int getRemainingBudgetMB() {
- lock.lock();
- try {
- return totalBudgetMB - totalReservedMB;
- } finally {
- lock.unlock();
- }
- }
-
- public void reserveInsist(MemoryConsumer consumer, int requestMB) {
- long waitStart = 0;
- while (true) {
- try {
- reserve(consumer, requestMB);
- if (debug && waitStart > 0)
- logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
- return;
- } catch (NotEnoughBudgetException ex) {
- // retry
- }
-
- if (waitStart == 0)
- waitStart = System.currentTimeMillis();
-
- synchronized (lock) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- throw new NotEnoughBudgetException(e);
- }
- }
- }
- }
-
- /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
- public void reserve(MemoryConsumer consumer, int requestMB) {
- if (totalBudgetMB == 0 && requestMB > 0)
- throw new NotEnoughBudgetException();
-
- boolean ok = false;
- while (!ok) {
- int gap = calculateGap(consumer, requestMB);
- if (gap > 0) {
- // to void deadlock, don't hold lock when invoking consumer.freeUp()
- tryFreeUp(gap);
- }
- ok = updateBooking(consumer, requestMB);
- }
- }
-
- private int calculateGap(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- int curMB = entry == null ? 0 : entry.reservedMB;
- int delta = requestMB - curMB;
- return delta - (totalBudgetMB - totalReservedMB);
- } finally {
- lock.unlock();
- }
- }
-
- private void tryFreeUp(int gap) {
- // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
- for (ConsumerEntry entry : booking.values()) {
- int mb = entry.consumer.freeUp(gap);
- if (mb > 0) {
- lock.lock();
- try {
- updateBookingWithDelta(entry.consumer, -mb);
- } finally {
- lock.unlock();
- }
- gap -= mb;
- if (gap <= 0)
- break;
- }
- }
- if (gap > 0)
- throw new NotEnoughBudgetException();
-
- if (debug) {
- if (getSystemAvailMB() < getRemainingBudgetMB()) {
- logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
- }
- }
- }
-
- private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
- lock.lock();
- try {
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (requestMB == 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- int delta = requestMB - entry.reservedMB;
- return updateBookingWithDelta(consumer, delta);
- } finally {
- lock.unlock();
- }
- }
-
- // lock MUST be obtained before entering
- private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
- if (delta == 0)
- return true;
-
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- if (delta <= 0)
- return true;
-
- entry = new ConsumerEntry(consumer);
- booking.put(consumer, entry);
- }
-
- // double check gap again, it may be changed by other concurrent requests
- if (delta > 0) {
- int gap = delta - (totalBudgetMB - totalReservedMB);
- if (gap > 0)
- return false;
- }
-
- totalReservedMB += delta;
- entry.reservedMB += delta;
- if (entry.reservedMB == 0) {
- booking.remove(entry.consumer);
- }
- if (debug) {
- logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
- }
-
- if (delta < 0) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
-
- return true;
- }
-
- public static long getSystemAvailBytes() {
- Runtime runtime = Runtime.getRuntime();
- long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
- long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
- long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
- long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
- long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
- return availableMemory;
- }
-
- public static int getSystemAvailMB() {
- return (int) (getSystemAvailBytes() / ONE_MB);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cfe9f0a8/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index f18e840..89f90ba 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -29,7 +29,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinReducer;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,9 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
/**
* @author yangli9
@@ -68,34 +69,15 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
TblColRef col = columnList.get(key.get());
HashSet<ByteArray> set = new HashSet<ByteArray>();
- int count = 0;
for (Text textValue : values) {
ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
set.add(value);
- count++;
- if (count % 10000 == 0 && MemoryBudgetController.getSystemAvailMB() < 100) {
- outputDistinctValues(col, set, context);
- set.clear();
- }
- }
-
- if (set.isEmpty() == false) {
- outputDistinctValues(col, set, context);
}
- }
-
- private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException {
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = FileSystem.get(conf);
- final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, col.getName());
- FSDataOutputStream out;
- if (fs.exists(outputFile)) {
- out = fs.append(outputFile);
- } else {
- out = fs.create(outputFile);
- }
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
try {
for (ByteArray value : set) {
@@ -105,6 +87,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
} finally {
out.close();
}
+
}
}