You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/30 01:31:19 UTC

incubator-kylin git commit: KYLIN-668, a new InMemCubeBuilder under job.inmemcubing, write disk directly

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 a768f1900 -> 3d6a036b1


KYLIN-668, a new InMemCubeBuilder under job.inmemcubing, write disk directly


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3d6a036b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3d6a036b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3d6a036b

Branch: refs/heads/0.8.0
Commit: 3d6a036b1d47c53ba04164f627ab8d6d1193405e
Parents: a768f19
Author: Yang Li <li...@apache.org>
Authored: Sat May 30 07:30:53 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat May 30 07:30:53 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     |  21 ++-
 .../apache/kylin/job/InMemCubeBuilderTest.java  | 151 ----------------
 .../job/inmemcubing/InMemCubeBuilderTest.java   | 178 +++++++++++++++++++
 .../storage/gridtable/GTAggregateScanner.java   |   1 -
 .../kylin/storage/gridtable/MemoryChecker.java  |  29 ---
 .../gridtable/memstore/GTMemDiskStore.java      |  44 +++--
 .../gridtable/memstore/GTSimpleMemStore.java    |   2 -
 .../kylin/streaming/cube/InMemCubeBuilder.java  |  17 --
 8 files changed, 222 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 26286cb..03496e7 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -8,6 +8,8 @@ import org.slf4j.LoggerFactory;
 
 public class MemoryBudgetController {
 
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+
     public static interface MemoryConsumer {
         // return number MB released
         int freeUp(int mb);
@@ -36,11 +38,10 @@ public class MemoryBudgetController {
     private final AtomicInteger totalReservedMB;
     private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
 
-
     public MemoryBudgetController(int totalBudgetMB) {
         if (totalBudgetMB < 0)
             throw new IllegalArgumentException();
-        if (checkSystemAvailMB(totalBudgetMB) == false)
+        if (totalBudgetMB > 0 && checkSystemAvailMB(totalBudgetMB) == false)
             throw new IllegalStateException();
 
         this.totalBudgetMB = totalBudgetMB;
@@ -62,7 +63,7 @@ public class MemoryBudgetController {
     public void reserve(MemoryConsumer consumer, int requestMB) {
         if (totalBudgetMB == 0 && requestMB > 0)
             throw new NotEnoughBudgetException();
-        
+
         ConsumerEntry entry = booking.get(consumer);
         if (entry == null) {
             booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
@@ -87,7 +88,9 @@ public class MemoryBudgetController {
         if (delta < 0) {
             this.notify();
         }
-        logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        if (delta != 0) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
     }
 
     private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
@@ -110,8 +113,10 @@ public class MemoryBudgetController {
                 break;
 
             try {
-                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
-                this.wait(200);
+                synchronized (this) {
+                    logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+                    this.wait(200);
+                }
             } catch (InterruptedException e) {
                 logger.error("Interrupted while wait free memory", e);
             }
@@ -133,13 +138,13 @@ public class MemoryBudgetController {
         long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
         return availableMemory;
     }
-    
+
     public static int getSystemAvailMB() {
         return (int) (getSystemAvailBytes() / ONE_MB);
     }
 
     public static int getMaxPossibleBudget() {
-        return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+        return getSystemAvailMB() - SYSTEM_RESERVED;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
deleted file mode 100644
index 781273d..0000000
--- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.lookup.FileTableReader;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-    private CubeManager cubeManager;
-
-    @Before
-    public void before() {
-        createTestMetadata();
-        
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        cubeManager = CubeManager.getInstance(kylinConfig);
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-    
-    @Test
-    public void test() throws Exception {
-        final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-
-        Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
-        int nColumns = flatTableDesc.getColumnList().size();
-        FileTableReader reader = new FileTableReader(flatTable, nColumns);
-        
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            queue.put(Arrays.asList(row));
-        }
-        queue.put(new ArrayList<String>(0));
-        reader.close();
-
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            throw new IOException("Failed to build cube ", e);
-        }
-
-        logger.info("stream build finished");
-    }
-
-    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
-        Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
-        CubeDesc desc = cube.getDescriptor();
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        int nColumns = flatTableDesc.getColumnList().size();
-
-        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
-        for (int c = 0; c < columns.size(); c++) {
-            TblColRef col = columns.get(c);
-            if (desc.getRowkey().isUseDictionary(col)) {
-                logger.info("Building dictionary for " + col);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
-                result.put(col, dict);
-            }
-        }
-        return result;
-    }
-
-    private List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
-        List<byte[]> result = Lists.newArrayList();
-        FileTableReader reader = new FileTableReader(flatTable, nColumns);
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            if (row[c] != null) {
-                result.add(Bytes.toBytes(row[c]));
-            }
-        }
-        reader.close();
-        return result;
-    }
-
-    class ConsoleGTRecordWriter implements IGTRecordWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
new file mode 100644
index 0000000..b5924c2
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -0,0 +1,178 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.lookup.FileTableReader;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
+
+    private KylinConfig kylinConfig;
+    private CubeManager cubeManager;
+
+    @Before
+    public void before() {
+        createTestMetadata();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        cubeManager = CubeManager.getInstance(kylinConfig);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+
+        Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cubeBuilder);
+
+        feedData(cube, flatTable, queue, 70000);
+
+        try {
+            future.get();
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            throw new IOException("Failed to build cube ", e);
+        }
+
+        logger.info("stream build finished");
+    }
+
+    private void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        @SuppressWarnings("unchecked")
+        Set<String>[] distinctSets = new Set[nColumns];
+        for (int i = 0; i < nColumns; i++)
+            distinctSets[i] = new TreeSet<String>();
+        
+        // get distinct values on each column
+        FileTableReader reader = new FileTableReader(flatTable, nColumns);
+        while (count > 0 && reader.next()) {
+            String[] row = reader.getRow();
+            for (int i = 0; i < nColumns; i++)
+                distinctSets[i].add(row[i]);
+        }
+        reader.close();
+        
+        List<String[]> distincts = new ArrayList<String[]>();
+        for (int i = 0; i < nColumns; i++) {
+            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+        }
+        
+        // output with random data
+        Random rand = new Random();
+        for (; count > 0; count--) {
+            ArrayList<String> row = new ArrayList<String>(nColumns);
+            for (int i = 0; i < nColumns; i++) {
+                String[] candidates = distincts.get(i);
+                row.add(candidates[rand.nextInt(candidates.length)]);
+            }
+            queue.put(row);
+        }
+        queue.put(new ArrayList<String>(0));
+    }
+
+    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+        Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+        CubeDesc desc = cube.getDescriptor();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+        for (int c = 0; c < columns.size(); c++) {
+            TblColRef col = columns.get(c);
+            if (desc.getRowkey().isUseDictionary(col)) {
+                logger.info("Building dictionary for " + col);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
+                result.put(col, dict);
+            }
+        }
+        return result;
+    }
+
+    private List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+        List<byte[]> result = Lists.newArrayList();
+        FileTableReader reader = new FileTableReader(flatTable, nColumns);
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            if (row[c] != null) {
+                result.add(Bytes.toBytes(row[c]));
+            }
+        }
+        reader.close();
+        return result;
+    }
+
+    class ConsoleGTRecordWriter implements ICuboidWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index a7110ef..ec92f92 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -67,7 +67,6 @@ public class GTAggregateScanner implements IGTScanner {
         AggregationCache aggregationCacheWithBytesKey = new AggregationCache();
         for (GTRecord r : inputScanner) {
             aggregationCacheWithBytesKey.aggregate(r);
-            MemoryChecker.checkMemory();
         }
         return aggregationCacheWithBytesKey.iterator();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
deleted file mode 100644
index 2dac8fe..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public final class MemoryChecker {
-
-    private static Logger logger = LoggerFactory.getLogger(MemoryChecker.class);
-
-    private static final int MEMORY_THRESHOLD = 80 << 20;
-
-    private MemoryChecker() {
-    }
-
-    public static final void checkMemory() {
-        if (!Thread.currentThread().isInterrupted()) {
-            final long freeMem = Runtime.getRuntime().freeMemory();
-            if (freeMem <= MEMORY_THRESHOLD) {
-                throw new OutOfMemoryError("free memory:" + freeMem + " is lower than " + MEMORY_THRESHOLD);
-            }
-        } else {
-            logger.info("thread interrupted");
-            throw new OutOfMemoryError("thread interrupted");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 4ae4f5c..4cc2f85 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -31,12 +31,15 @@ import org.slf4j.LoggerFactory;
 public class GTMemDiskStore implements IGTStore, Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+    private static final boolean debug = false;
+
     private static final int STREAM_BUFFER_SIZE = 8192;
     private static final int MEM_CHUNK_SIZE_MB = 1;
 
     final GTInfo info;
     final MemPart memPart;
     final DiskPart diskPart;
+    final boolean delOnClose;
 
     public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
         this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
@@ -46,12 +49,14 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         this(info, budgetCtrl, diskFile, false);
     }
 
-    private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
+    private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
         this.info = info;
         this.memPart = new MemPart(budgetCtrl);
         this.diskPart = new DiskPart(diskFile);
+        this.delOnClose = delOnClose;
 
-        if (delOnExit)
+        // in case user forget to call close()
+        if (delOnClose)
             diskFile.deleteOnExit();
     }
 
@@ -99,7 +104,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         Reader() throws IOException {
             diskPart.openRead();
-            logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
 
             InputStream in = new InputStream() {
                 byte[] tmp = new byte[1];
@@ -201,7 +207,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         public void close() throws IOException {
             din.close();
             diskPart.closeRead();
-            logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
         }
 
     }
@@ -219,7 +226,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             memPart.clear();
             diskPart.clear();
             diskPart.openWrite(false);
-            logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
 
             memPart.activateMemWrite();
 
@@ -268,7 +276,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             memPart.finishAsyncFlush();
             diskPart.closeWrite();
             assert diskOffset == diskPart.tailOffset;
-            logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
         }
     }
 
@@ -370,7 +379,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                     }
                     chunkCount++;
                 }
-                
+
                 int n = Math.min(lastChunk.freeSpace(), length);
                 System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
                 lastChunk.length += n;
@@ -392,7 +401,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 asyncFlusher = new Thread() {
                     public void run() {
                         asyncFlushException = null;
-                        logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+                        if (debug)
+                            logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
                         try {
                             while (writeActivated) {
                                 flushToDisk();
@@ -402,7 +412,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                         } catch (Throwable ex) {
                             asyncFlushException = ex;
                         }
-                        logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+                        if (debug)
+                            logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
                     }
                 };
                 asyncFlusher.start();
@@ -419,7 +430,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 data = null;
                 synchronized (memPart) {
                     asyncFlushDiskOffset += flushedLen; // bytes written in last loop
-                    // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+                    if (debug)
+                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
                     if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
                         asyncFlushChunk = asyncFlushChunk.next;
                     }
@@ -479,12 +491,14 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         public void activateMemWrite() {
             writeActivated = true;
-            logger.debug(GTMemDiskStore.this + " mem write activated");
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " mem write activated");
         }
 
         public void deactivateMemWrite() {
             writeActivated = false;
-            logger.debug(GTMemDiskStore.this + " mem write de-activated");
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " mem write de-activated");
         }
 
         synchronized public void clear() {
@@ -515,7 +529,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         DiskPart(File diskFile) throws IOException {
             this.diskFile = diskFile;
             this.tailOffset = diskFile.length();
-            logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+            if (debug)
+                logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
         }
 
         public void openRead() throws IOException {
@@ -567,6 +582,9 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         public void close() throws IOException {
             closeWrite();
             closeRead();
+            if (delOnClose) {
+                diskFile.delete();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index c97fe39..a4d0b8d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -11,7 +11,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTRowBlock;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.gridtable.*;
 
 public class GTSimpleMemStore implements IGTStore {
 
@@ -74,7 +73,6 @@ public class GTSimpleMemStore implements IGTStore {
             } else {
                 assert id == rowBlockList.size();
                 rowBlockList.add(copy);
-                MemoryChecker.checkMemory();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
index e5a5b5f..0b0dc4c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
@@ -1,35 +1,18 @@
 /*
- *
- *
  *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
  *  contributor license agreements. See the NOTICE file distributed with
- *
  *  this work for additional information regarding copyright ownership.
- *
  *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
  *  (the "License"); you may not use this file except in compliance with
- *
  *  the License. You may obtain a copy of the License at
  *
- *
- *
  *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *
- *
  *  Unless required by applicable law or agreed to in writing, software
- *
  *  distributed under the License is distributed on an "AS IS" BASIS,
- *
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
  *  See the License for the specific language governing permissions and
- *
  *  limitations under the License.
- *
- * /
  */
 package org.apache.kylin.streaming.cube;