You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/30 01:31:19 UTC
incubator-kylin git commit: KYLIN-668,
a new InMemCubeBuilder under job.inmemcubing, write disk directly
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 a768f1900 -> 3d6a036b1
KYLIN-668, a new InMemCubeBuilder under job.inmemcubing, write disk directly
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3d6a036b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3d6a036b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3d6a036b
Branch: refs/heads/0.8.0
Commit: 3d6a036b1d47c53ba04164f627ab8d6d1193405e
Parents: a768f19
Author: Yang Li <li...@apache.org>
Authored: Sat May 30 07:30:53 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat May 30 07:30:53 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 21 ++-
.../apache/kylin/job/InMemCubeBuilderTest.java | 151 ----------------
.../job/inmemcubing/InMemCubeBuilderTest.java | 178 +++++++++++++++++++
.../storage/gridtable/GTAggregateScanner.java | 1 -
.../kylin/storage/gridtable/MemoryChecker.java | 29 ---
.../gridtable/memstore/GTMemDiskStore.java | 44 +++--
.../gridtable/memstore/GTSimpleMemStore.java | 2 -
.../kylin/streaming/cube/InMemCubeBuilder.java | 17 --
8 files changed, 222 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 26286cb..03496e7 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -8,6 +8,8 @@ import org.slf4j.LoggerFactory;
public class MemoryBudgetController {
+ public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+
public static interface MemoryConsumer {
// return number MB released
int freeUp(int mb);
@@ -36,11 +38,10 @@ public class MemoryBudgetController {
private final AtomicInteger totalReservedMB;
private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-
public MemoryBudgetController(int totalBudgetMB) {
if (totalBudgetMB < 0)
throw new IllegalArgumentException();
- if (checkSystemAvailMB(totalBudgetMB) == false)
+ if (totalBudgetMB > 0 && checkSystemAvailMB(totalBudgetMB) == false)
throw new IllegalStateException();
this.totalBudgetMB = totalBudgetMB;
@@ -62,7 +63,7 @@ public class MemoryBudgetController {
public void reserve(MemoryConsumer consumer, int requestMB) {
if (totalBudgetMB == 0 && requestMB > 0)
throw new NotEnoughBudgetException();
-
+
ConsumerEntry entry = booking.get(consumer);
if (entry == null) {
booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
@@ -87,7 +88,9 @@ public class MemoryBudgetController {
if (delta < 0) {
this.notify();
}
- logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ if (delta != 0) {
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ }
}
private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
@@ -110,8 +113,10 @@ public class MemoryBudgetController {
break;
try {
- logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
- this.wait(200);
+ synchronized (this) {
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ this.wait(200);
+ }
} catch (InterruptedException e) {
logger.error("Interrupted while wait free memory", e);
}
@@ -133,13 +138,13 @@ public class MemoryBudgetController {
long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
return availableMemory;
}
-
+
public static int getSystemAvailMB() {
return (int) (getSystemAvailBytes() / ONE_MB);
}
public static int getMaxPossibleBudget() {
- return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+ return getSystemAvailMB() - SYSTEM_RESERVED;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
deleted file mode 100644
index 781273d..0000000
--- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.lookup.FileTableReader;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
- private KylinConfig kylinConfig;
- private CubeManager cubeManager;
-
- @Before
- public void before() {
- createTestMetadata();
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- cubeManager = CubeManager.getInstance(kylinConfig);
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
- final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-
- Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
-
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cubeBuilder);
-
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
- int nColumns = flatTableDesc.getColumnList().size();
- FileTableReader reader = new FileTableReader(flatTable, nColumns);
-
- while (reader.next()) {
- String[] row = reader.getRow();
- queue.put(Arrays.asList(row));
- }
- queue.put(new ArrayList<String>(0));
- reader.close();
-
- try {
- future.get();
- } catch (Exception e) {
- logger.error("stream build failed", e);
- throw new IOException("Failed to build cube ", e);
- }
-
- logger.info("stream build finished");
- }
-
- private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
- Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
- CubeDesc desc = cube.getDescriptor();
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- int nColumns = flatTableDesc.getColumnList().size();
-
- List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
- for (int c = 0; c < columns.size(); c++) {
- TblColRef col = columns.get(c);
- if (desc.getRowkey().isUseDictionary(col)) {
- logger.info("Building dictionary for " + col);
- List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
- result.put(col, dict);
- }
- }
- return result;
- }
-
- private List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
- List<byte[]> result = Lists.newArrayList();
- FileTableReader reader = new FileTableReader(flatTable, nColumns);
- while (reader.next()) {
- String[] row = reader.getRow();
- if (row[c] != null) {
- result.add(Bytes.toBytes(row[c]));
- }
- }
- reader.close();
- return result;
- }
-
- class ConsoleGTRecordWriter implements IGTRecordWriter {
-
- boolean verbose = false;
-
- @Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
- if (verbose)
- System.out.println(record.toString());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
new file mode 100644
index 0000000..b5924c2
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.lookup.FileTableReader;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
+
+ private KylinConfig kylinConfig;
+ private CubeManager cubeManager;
+
+ @Before
+ public void before() {
+ createTestMetadata();
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ cubeManager = CubeManager.getInstance(kylinConfig);
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+ final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+
+ Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cubeBuilder);
+
+ feedData(cube, flatTable, queue, 70000);
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ throw new IOException("Failed to build cube ", e);
+ }
+
+ logger.info("stream build finished");
+ }
+
+ private void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ @SuppressWarnings("unchecked")
+ Set<String>[] distinctSets = new Set[nColumns];
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i] = new TreeSet<String>();
+
+ // get distinct values on each column
+ FileTableReader reader = new FileTableReader(flatTable, nColumns);
+ while (count > 0 && reader.next()) {
+ String[] row = reader.getRow();
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i].add(row[i]);
+ }
+ reader.close();
+
+ List<String[]> distincts = new ArrayList<String[]>();
+ for (int i = 0; i < nColumns; i++) {
+ distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+ }
+
+ // output with random data
+ Random rand = new Random();
+ for (; count > 0; count--) {
+ ArrayList<String> row = new ArrayList<String>(nColumns);
+ for (int i = 0; i < nColumns; i++) {
+ String[] candidates = distincts.get(i);
+ row.add(candidates[rand.nextInt(candidates.length)]);
+ }
+ queue.put(row);
+ }
+ queue.put(new ArrayList<String>(0));
+ }
+
+ private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+ Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+ CubeDesc desc = cube.getDescriptor();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+ for (int c = 0; c < columns.size(); c++) {
+ TblColRef col = columns.get(c);
+ if (desc.getRowkey().isUseDictionary(col)) {
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
+ result.put(col, dict);
+ }
+ }
+ return result;
+ }
+
+ private List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+ List<byte[]> result = Lists.newArrayList();
+ FileTableReader reader = new FileTableReader(flatTable, nColumns);
+ while (reader.next()) {
+ String[] row = reader.getRow();
+ if (row[c] != null) {
+ result.add(Bytes.toBytes(row[c]));
+ }
+ }
+ reader.close();
+ return result;
+ }
+
+ class ConsoleGTRecordWriter implements ICuboidWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index a7110ef..ec92f92 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -67,7 +67,6 @@ public class GTAggregateScanner implements IGTScanner {
AggregationCache aggregationCacheWithBytesKey = new AggregationCache();
for (GTRecord r : inputScanner) {
aggregationCacheWithBytesKey.aggregate(r);
- MemoryChecker.checkMemory();
}
return aggregationCacheWithBytesKey.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
deleted file mode 100644
index 2dac8fe..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public final class MemoryChecker {
-
- private static Logger logger = LoggerFactory.getLogger(MemoryChecker.class);
-
- private static final int MEMORY_THRESHOLD = 80 << 20;
-
- private MemoryChecker() {
- }
-
- public static final void checkMemory() {
- if (!Thread.currentThread().isInterrupted()) {
- final long freeMem = Runtime.getRuntime().freeMemory();
- if (freeMem <= MEMORY_THRESHOLD) {
- throw new OutOfMemoryError("free memory:" + freeMem + " is lower than " + MEMORY_THRESHOLD);
- }
- } else {
- logger.info("thread interrupted");
- throw new OutOfMemoryError("thread interrupted");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 4ae4f5c..4cc2f85 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -31,12 +31,15 @@ import org.slf4j.LoggerFactory;
public class GTMemDiskStore implements IGTStore, Closeable {
private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+ private static final boolean debug = false;
+
private static final int STREAM_BUFFER_SIZE = 8192;
private static final int MEM_CHUNK_SIZE_MB = 1;
final GTInfo info;
final MemPart memPart;
final DiskPart diskPart;
+ final boolean delOnClose;
public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
@@ -46,12 +49,14 @@ public class GTMemDiskStore implements IGTStore, Closeable {
this(info, budgetCtrl, diskFile, false);
}
- private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
+ private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
this.info = info;
this.memPart = new MemPart(budgetCtrl);
this.diskPart = new DiskPart(diskFile);
+ this.delOnClose = delOnClose;
- if (delOnExit)
+ // in case user forget to call close()
+ if (delOnClose)
diskFile.deleteOnExit();
}
@@ -99,7 +104,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
Reader() throws IOException {
diskPart.openRead();
- logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
InputStream in = new InputStream() {
byte[] tmp = new byte[1];
@@ -201,7 +207,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public void close() throws IOException {
din.close();
diskPart.closeRead();
- logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
}
}
@@ -219,7 +226,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
memPart.clear();
diskPart.clear();
diskPart.openWrite(false);
- logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
memPart.activateMemWrite();
@@ -268,7 +276,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
memPart.finishAsyncFlush();
diskPart.closeWrite();
assert diskOffset == diskPart.tailOffset;
- logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
}
}
@@ -370,7 +379,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
chunkCount++;
}
-
+
int n = Math.min(lastChunk.freeSpace(), length);
System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
lastChunk.length += n;
@@ -392,7 +401,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
asyncFlusher = new Thread() {
public void run() {
asyncFlushException = null;
- logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
try {
while (writeActivated) {
flushToDisk();
@@ -402,7 +412,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
} catch (Throwable ex) {
asyncFlushException = ex;
}
- logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
}
};
asyncFlusher.start();
@@ -419,7 +430,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
data = null;
synchronized (memPart) {
asyncFlushDiskOffset += flushedLen; // bytes written in last loop
- // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
asyncFlushChunk = asyncFlushChunk.next;
}
@@ -479,12 +491,14 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public void activateMemWrite() {
writeActivated = true;
- logger.debug(GTMemDiskStore.this + " mem write activated");
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " mem write activated");
}
public void deactivateMemWrite() {
writeActivated = false;
- logger.debug(GTMemDiskStore.this + " mem write de-activated");
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " mem write de-activated");
}
synchronized public void clear() {
@@ -515,7 +529,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
DiskPart(File diskFile) throws IOException {
this.diskFile = diskFile;
this.tailOffset = diskFile.length();
- logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
}
public void openRead() throws IOException {
@@ -567,6 +582,9 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public void close() throws IOException {
closeWrite();
closeRead();
+ if (delOnClose) {
+ diskFile.delete();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index c97fe39..a4d0b8d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -11,7 +11,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.gridtable.*;
public class GTSimpleMemStore implements IGTStore {
@@ -74,7 +73,6 @@ public class GTSimpleMemStore implements IGTStore {
} else {
assert id == rowBlockList.size();
rowBlockList.add(copy);
- MemoryChecker.checkMemory();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d6a036b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
index e5a5b5f..0b0dc4c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
@@ -1,35 +1,18 @@
/*
- *
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
- *
* contributor license agreements. See the NOTICE file distributed with
- *
* this work for additional information regarding copyright ownership.
- *
* The ASF licenses this file to You under the Apache License, Version 2.0
- *
* (the "License"); you may not use this file except in compliance with
- *
* the License. You may obtain a copy of the License at
*
- *
- *
* http://www.apache.org/licenses/LICENSE-2.0
*
- *
- *
* Unless required by applicable law or agreed to in writing, software
- *
* distributed under the License is distributed on an "AS IS" BASIS,
- *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
* See the License for the specific language governing permissions and
- *
* limitations under the License.
- *
- * /
*/
package org.apache.kylin.streaming.cube;