You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/30 14:52:08 UTC
incubator-kylin git commit: KYLIN-668,
InMemCubeBuilder move to job module
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 3d6a036b1 -> b4b108842
KYLIN-668, InMemCubeBuilder move to job module
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b4b10884
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b4b10884
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b4b10884
Branch: refs/heads/0.8.0
Commit: b4b1088423235cdedc43daecaf051beeb6b4ce98
Parents: 3d6a036
Author: Yang Li <li...@apache.org>
Authored: Sat May 30 20:51:28 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat May 30 20:51:28 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 10 +-
.../common/util/MemoryBudgetControllerTest.java | 2 +-
.../job/hadoop/cubev2/InMemCuboidMapper.java | 21 +-
.../hadoop/cubev2/MapContextGTRecordWriter.java | 19 +-
.../kylin/job/streaming/CubeStreamBuilder.java | 55 +-
.../kylin/job/BuildCubeWithStreamTest.java | 48 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 8 +-
.../kylin/metadata/measure/HLLCAggregator.java | 2 +-
.../storage/gridtable/GTAggregateScanner.java | 25 +-
.../gridtable/memstore/GTMemDiskStore.java | 305 ++++++----
.../kylin/streaming/cube/IGTRecordWriter.java | 11 -
.../kylin/streaming/cube/InMemCubeBuilder.java | 550 -------------------
.../cube/InMemCubeBuilderBenchmarkTest.java | 117 ----
13 files changed, 292 insertions(+), 881 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 03496e7..82e503c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -8,8 +8,6 @@ import org.slf4j.LoggerFactory;
public class MemoryBudgetController {
- public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
-
public static interface MemoryConsumer {
// return number MB released
int freeUp(int mb);
@@ -28,8 +26,8 @@ public class MemoryBudgetController {
}
}
+ public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
public static final int ONE_MB = 1024 * 1024;
- public static final int SYSTEM_RESERVED = 200;
private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
@@ -126,7 +124,7 @@ public class MemoryBudgetController {
}
private boolean checkSystemAvailMB(int mb) {
- return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+ return getSystemAvailMB() >= mb;
}
public static long getSystemAvailBytes() {
@@ -143,8 +141,4 @@ public class MemoryBudgetController {
return (int) (getSystemAvailBytes() / ONE_MB);
}
- public static int getMaxPossibleBudget() {
- return getSystemAvailMB() - SYSTEM_RESERVED;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
index bc2eadd..a523263 100644
--- a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -11,7 +11,7 @@ public class MemoryBudgetControllerTest {
@Test
public void test() {
- int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+ int n = MemoryBudgetController.getSystemAvailMB() / 2;
MemoryBudgetController mbc = new MemoryBudgetController(n);
ArrayList<OneMB> mbList = new ArrayList<OneMB>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 4efff16..fdb4823 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -1,6 +1,15 @@
package org.apache.kylin.job.hadoop.cubev2;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -18,15 +27,11 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.lookup.HiveTableReader;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.collect.Maps;
/**
*/
@@ -70,7 +75,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
}
}
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
ExecutorService executorService = Executors.newSingleThreadExecutor();
future = executorService.submit(cubeBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 283bed6..6fcd01a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -1,6 +1,9 @@
package org.apache.kylin.job.hadoop.cubev2;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -11,21 +14,13 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.List;
/**
*/
-public class MapContextGTRecordWriter implements IGTRecordWriter {
+public class MapContextGTRecordWriter implements ICuboidWriter {
private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
protected MapContext<?, ?, ImmutableBytesWritable, Text> mapContext;
@@ -52,7 +47,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
}
@Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
+ public void write(long cuboidId, GTRecord record) throws IOException {
if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
// output another cuboid
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 0bd2792..be44e18 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -1,13 +1,21 @@
package org.apache.kylin.job.streaming;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +50,8 @@ import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
@@ -49,19 +59,17 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.streaming.SEOJsonStreamParser;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
/**
*/
@@ -111,7 +119,7 @@ public class CubeStreamBuilder extends StreamBuilder {
final HTableInterface hTable = createHTable(cubeSegment);
final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
dictionaryMap, gtRecordWriter);
executorService.submit(inMemCubeBuilder).get();
@@ -144,7 +152,7 @@ public class CubeStreamBuilder extends StreamBuilder {
}
}
- private class CubeStreamRecordWriter implements IGTRecordWriter {
+ private class CubeStreamRecordWriter implements ICuboidWriter {
final List<InMemKeyValueCreator> keyValueCreators;
final int nColumns;
final HTableInterface hTable;
@@ -185,7 +193,7 @@ public class CubeStreamBuilder extends StreamBuilder {
}
@Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
+ public void write(long cuboidId, GTRecord record) throws IOException {
final ByteBuffer key = createKey(cuboidId, record);
final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
final BitSet bitSet = new BitSet();
@@ -380,11 +388,6 @@ public class CubeStreamBuilder extends StreamBuilder {
return hTable;
}
- private void loadToHTable(String hTableName) throws IOException {
- final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-
- }
-
@Override
protected void onStop() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 319d7fa..9dca76a 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,11 +34,18 @@
package org.apache.kylin.job;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
@@ -55,25 +62,24 @@ import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
/**
*
@@ -145,7 +151,7 @@ public class BuildCubeWithStreamTest {
ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(cubeBuilder);
@@ -211,12 +217,12 @@ public class BuildCubeWithStreamTest {
}
- class ConsoleGTRecordWriter implements IGTRecordWriter {
+ class ConsoleGTRecordWriter implements ICuboidWriter {
boolean verbose = false;
@Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
+ public void write(long cuboidId, GTRecord record) throws IOException {
if (verbose)
System.out.println(record.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index b5924c2..0a0c97c 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -81,11 +81,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(cubeBuilder);
- feedData(cube, flatTable, queue, 70000);
+ feedData(cube, flatTable, queue, 60000);
try {
future.get();
@@ -93,8 +93,6 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
logger.error("stream build failed", e);
throw new IOException("Failed to build cube ", e);
}
-
- logger.info("stream build finished");
}
private void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
@@ -170,7 +168,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
boolean verbose = false;
@Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
+ public void write(long cuboidId, GTRecord record) throws IOException {
if (verbose)
System.out.println(record.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
index b07fd80..a168743 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
@@ -58,7 +58,7 @@ public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+ 4 // precision
+ 8 // ref to HLLC
+ 8 // HLLC obj shell
- + 32 + (1 >> precision); // HLLC internal
+ + 32 + (1 << precision); // HLLC internal
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index ec92f92..fcff4c0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -28,6 +28,8 @@ public class GTAggregateScanner implements IGTScanner {
final BitSet metrics;
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
+
+ long estimateSizeOfAggrCache;
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
if (req.hasAggregation() == false)
@@ -64,11 +66,19 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- AggregationCache aggregationCacheWithBytesKey = new AggregationCache();
+ AggregationCache aggrCache = new AggregationCache();
for (GTRecord r : inputScanner) {
- aggregationCacheWithBytesKey.aggregate(r);
+ aggrCache.aggregate(r);
}
- return aggregationCacheWithBytesKey.iterator();
+
+ estimateSizeOfAggrCache = aggrCache.esitmateMemSize();
+
+ return aggrCache.iterator();
+ }
+
+ /** for last call to iterator(), return the estimate memory size of its aggregation cache */
+ public long getEstimateSizeOfAggrCache() {
+ return estimateSizeOfAggrCache;
}
class AggregationCache {
@@ -148,6 +158,15 @@ public class GTAggregateScanner implements IGTScanner {
aggrs[i].aggregate(metrics);
}
}
+
+ public long esitmateMemSize() {
+ if (aggBufMap.isEmpty())
+ return 0;
+
+ byte[] sampleKey = aggBufMap.firstKey();
+ MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
+ return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
+ }
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 4cc2f85..0b2c0a4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -34,12 +34,15 @@ public class GTMemDiskStore implements IGTStore, Closeable {
private static final boolean debug = false;
private static final int STREAM_BUFFER_SIZE = 8192;
- private static final int MEM_CHUNK_SIZE_MB = 1;
+ private static final int MEM_CHUNK_SIZE_MB = 2;
- final GTInfo info;
- final MemPart memPart;
- final DiskPart diskPart;
- final boolean delOnClose;
+ private final GTInfo info;
+ private final Object lock; // all public methods that read/write object states are synchronized on this lock
+ private final MemPart memPart;
+ private final DiskPart diskPart;
+ private final boolean delOnClose;
+
+ private Writer ongoingWriter;
public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
@@ -51,6 +54,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
this.info = info;
+ this.lock = this;
this.memPart = new MemPart(budgetCtrl);
this.diskPart = new DiskPart(diskFile);
this.delOnClose = delOnClose;
@@ -67,25 +71,44 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public IGTStoreWriter rebuild(int shard) throws IOException {
- return new Writer(0);
+ return newWriter(0);
}
@Override
public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
- return new Writer(diskPart.tailOffset);
+ return newWriter(length());
+ }
+
+ private Writer newWriter(long startOffset) throws IOException {
+ synchronized (lock) {
+ if (ongoingWriter != null)
+ throw new IllegalStateException();
+
+ ongoingWriter = new Writer(startOffset);
+ return ongoingWriter;
+ }
}
@Override
public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
- return new Reader();
+ synchronized (lock) {
+ return new Reader();
+ }
}
@Override
public void close() throws IOException {
+ // synchronized inside the parts close()
memPart.close();
diskPart.close();
}
+ public long length() {
+ synchronized (lock) {
+ return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+ }
+ }
+
@Override
public String toString() {
return "MemDiskStore@" + this.hashCode();
@@ -94,7 +117,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
private class Reader implements IGTStoreScanner {
final DataInputStream din;
- long diskOffset = 0;
+ long readOffset = 0;
long memRead = 0;
long diskRead = 0;
int nReadCalls = 0;
@@ -105,7 +128,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
Reader() throws IOException {
diskPart.openRead();
if (debug)
- logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+ logger.debug(GTMemDiskStore.this + " read start @ " + readOffset);
InputStream in = new InputStream() {
byte[] tmp = new byte[1];
@@ -122,47 +145,51 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public int read(byte[] b, int off, int len) throws IOException {
- nReadCalls++;
- if (available() <= 0)
- return -1;
-
- if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
- memChunk = memPart.seekMemChunk(diskOffset);
- }
+ synchronized (lock) {
+ nReadCalls++;
+ if (available() <= 0)
+ return -1;
- int lenToGo = Math.min(available(), len);
+ if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+ memChunk = memPart.seekMemChunk(readOffset);
+ }
- int nRead = 0;
- while (lenToGo > 0) {
- int n;
- if (memChunk != null) {
- if (memChunk.headOffset() > diskOffset) {
- memChunk = null;
- continue;
+ int lenToGo = Math.min(available(), len);
+
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n;
+ if (memChunk != null) {
+ if (memChunk.headOffset() > readOffset) {
+ memChunk = null;
+ continue;
+ }
+ if (readOffset >= memChunk.tailOffset()) {
+ memChunk = memChunk.next;
+ continue;
+ }
+ int chunkOffset = (int) (readOffset - memChunk.headOffset());
+ n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+ System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+ memRead += n;
+ } else {
+ n = diskPart.read(readOffset, b, off, lenToGo);
+ diskRead += n;
}
- if (diskOffset >= memChunk.tailOffset()) {
- memChunk = memChunk.next;
- continue;
- }
- int chunkOffset = (int) (diskOffset - memChunk.headOffset());
- n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
- System.arraycopy(memChunk.data, chunkOffset, b, off, n);
- memRead += n;
- } else {
- n = diskPart.read(diskOffset, b, off, lenToGo);
- diskRead += n;
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ readOffset += n;
}
- lenToGo -= n;
- nRead += n;
- off += n;
- diskOffset += n;
+ return nRead;
}
- return nRead;
}
@Override
public int available() throws IOException {
- return (int) (diskPart.tailOffset - diskOffset);
+ synchronized (lock) {
+ return (int) (length() - readOffset);
+ }
}
};
@@ -205,10 +232,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void close() throws IOException {
- din.close();
- diskPart.closeRead();
- if (debug)
- logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ synchronized (lock) {
+ din.close();
+ diskPart.closeRead();
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ }
}
}
@@ -216,18 +245,19 @@ public class GTMemDiskStore implements IGTStore, Closeable {
private class Writer implements IGTStoreWriter {
final DataOutputStream dout;
- long diskOffset;
+ long writeOffset;
long memWrite = 0;
long diskWrite = 0;
int nWriteCalls;
+ boolean closed = false;
Writer(long startOffset) throws IOException {
- diskOffset = 0; // TODO does not support append yet
+ writeOffset = 0; // TODO does not support append yet
memPart.clear();
diskPart.clear();
diskPart.openWrite(false);
if (debug)
- logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+ logger.debug(GTMemDiskStore.this + " write start @ " + writeOffset);
memPart.activateMemWrite();
@@ -243,22 +273,23 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void write(byte[] bytes, int offset, int length) throws IOException {
+ // lock inside memPart.write() and diskPartm.write()
nWriteCalls++;
while (length > 0) {
int n;
if (memPartActivated) {
- n = memPart.write(bytes, offset, length, diskOffset);
+ n = memPart.write(bytes, offset, length, writeOffset);
memWrite += n;
if (n == 0) {
memPartActivated = false;
}
} else {
- n = diskPart.write(diskOffset, bytes, offset, length);
+ n = diskPart.write(writeOffset, bytes, offset, length);
diskWrite += n;
}
offset += n;
length -= n;
- diskOffset += n;
+ writeOffset += n;
}
}
};
@@ -272,12 +303,23 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void close() throws IOException {
- dout.close();
- memPart.finishAsyncFlush();
- diskPart.closeWrite();
- assert diskOffset == diskPart.tailOffset;
- if (debug)
- logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ synchronized (lock) {
+ if (!closed) {
+ dout.close();
+ memPart.deactivateMemWrite();
+ }
+
+ if (memPart.asyncFlusher == null) {
+ assert writeOffset == diskPart.tailOffset;
+ diskPart.closeWrite();
+ ongoingWriter = null;
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ } else {
+ // the asyncFlusher will call this close() again later
+ }
+ closed = true;
+ }
}
}
@@ -308,7 +350,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
final MemoryBudgetController budgetCtrl;
- // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+ // async flush thread checks this flag out of sync block
volatile boolean writeActivated;
MemChunk firstChunk;
MemChunk lastChunk;
@@ -331,7 +373,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
return lastChunk == null ? 0 : lastChunk.tailOffset();
}
- synchronized public MemChunk seekMemChunk(long diskOffset) {
+ public MemChunk seekMemChunk(long diskOffset) {
MemChunk c = firstChunk;
while (c != null && c.headOffset() <= diskOffset) {
if (diskOffset < c.tailOffset())
@@ -344,7 +386,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public int write(byte[] bytes, int offset, int length, long diskOffset) {
int needMoreMem = 0;
- synchronized (this) {
+ synchronized (lock) {
if (writeActivated == false)
return 0;
@@ -356,7 +398,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
}
- // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+ // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
if (needMoreMem > 0) {
try {
budgetCtrl.reserve(this, needMoreMem);
@@ -366,8 +408,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
}
- synchronized (this) {
- if (needMoreMem > 0) {
+ synchronized (lock) {
+ if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
MemChunk chunk = new MemChunk();
chunk.diskOffset = diskOffset;
chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
@@ -409,11 +451,20 @@ public class GTMemDiskStore implements IGTStore, Closeable {
Thread.sleep(10);
}
flushToDisk();
+
+ if (debug)
+ logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+ synchronized (lock) {
+ asyncFlusher = null;
+ asyncFlushChunk = null;
+ if (ongoingWriter.closed) {
+ ongoingWriter.close(); // call writer.close() again to clean up
+ }
+ }
} catch (Throwable ex) {
asyncFlushException = ex;
}
- if (debug)
- logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
}
};
asyncFlusher.start();
@@ -428,7 +479,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
while (true) {
data = null;
- synchronized (memPart) {
+ synchronized (lock) {
asyncFlushDiskOffset += flushedLen; // bytes written in last loop
if (debug)
logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
@@ -449,44 +500,26 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
}
- public void finishAsyncFlush() throws IOException {
- deactivateMemWrite();
- if (asyncFlusher != null) {
- try {
- asyncFlusher.join();
- } catch (InterruptedException e) {
- logger.warn("", e);
- }
- asyncFlusher = null;
- asyncFlushChunk = null;
-
- if (asyncFlushException != null) {
- if (asyncFlushException instanceof IOException)
- throw (IOException) asyncFlushException;
- else
- throw new IOException(asyncFlushException);
- }
- }
- }
-
@Override
- synchronized public int freeUp(int mb) {
- int mbReleased = 0;
- while (chunkCount > 0 && mbReleased < mb) {
- if (firstChunk == asyncFlushChunk)
- break;
-
- mbReleased += MEM_CHUNK_SIZE_MB;
- chunkCount--;
- if (chunkCount == 0) {
- firstChunk = lastChunk = null;
- } else {
- MemChunk next = firstChunk.next;
- firstChunk.next = null;
- firstChunk = next;
+ public int freeUp(int mb) {
+ synchronized (lock) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased += MEM_CHUNK_SIZE_MB;
+ chunkCount--;
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = null;
+ } else {
+ MemChunk next = firstChunk.next;
+ firstChunk.next = null;
+ firstChunk = next;
+ }
}
+ return mbReleased;
}
- return mbReleased;
}
public void activateMemWrite() {
@@ -501,16 +534,38 @@ public class GTMemDiskStore implements IGTStore, Closeable {
logger.debug(GTMemDiskStore.this + " mem write de-activated");
}
- synchronized public void clear() {
- budgetCtrl.reserve(this, 0);
+ public void clear() {
chunkCount = 0;
firstChunk = lastChunk = null;
+ budgetCtrl.reserve(this, 0);
}
@Override
public void close() throws IOException {
- finishAsyncFlush();
- clear();
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+ }
+ try {
+ asyncFlusher.join();
+ } catch (NullPointerException npe) {
+ // that's fine, async flusher may not present
+ } catch (InterruptedException e) {
+ logger.warn("async join interrupted", e);
+ }
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+
+ clear();
+ }
+ }
+
+ private void throwAsyncException(Throwable ex) throws IOException {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else
+ throw new IOException(ex);
}
@Override
@@ -524,6 +579,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
final File diskFile;
FileChannel writeChannel;
FileChannel readChannel;
+ int readerCount = 0; // allow parallel readers
long tailOffset;
DiskPart(File diskFile) throws IOException {
@@ -534,8 +590,10 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
public void openRead() throws IOException {
- readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
- tailOffset = diskFile.length();
+ if (readChannel == null) {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ }
+ readerCount++;
}
public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
@@ -543,9 +601,16 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
public void closeRead() throws IOException {
- if (readChannel != null) {
- readChannel.close();
- readChannel = null;
+ closeRead(false);
+ }
+
+ private void closeRead(boolean force) throws IOException {
+ readerCount--;
+ if (readerCount == 0 || force) {
+ if (readChannel != null) {
+ readChannel.close();
+ readChannel = null;
+ }
}
}
@@ -561,9 +626,11 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
- tailOffset = Math.max(diskOffset + n, tailOffset);
- return n;
+ synchronized (lock) {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + n, tailOffset);
+ return n;
+ }
}
public void closeWrite() throws IOException {
@@ -580,10 +647,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void close() throws IOException {
- closeWrite();
- closeRead();
- if (delOnClose) {
- diskFile.delete();
+ synchronized (lock) {
+ closeWrite();
+ closeRead(true);
+ if (delOnClose) {
+ diskFile.delete();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
deleted file mode 100644
index 2d1e97e..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.streaming.cube;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface IGTRecordWriter {
- void write(Long cuboidId, GTRecord record) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
deleted file mode 100644
index 0b0dc4c..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.streaming.cube;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- */
-public class InMemCubeBuilder implements Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
- private static final int DEFAULT_TIMEOUT = 25;
-
- private BlockingQueue<List<String>> queue;
- private CubeDesc desc = null;
- private long baseCuboidId;
- private CuboidScheduler cuboidScheduler = null;
- private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
- private CubeJoinedFlatTableDesc intermediateTableDesc;
- private MeasureCodec measureCodec;
- private String[] metricsAggrFuncs = null;
- private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
- public static final LongWritable ONE = new LongWritable(1l);
- private int[] hbaseMeasureRefIndex;
- private MeasureDesc[] measureDescs;
- private int measureCount;
-
- protected IGTRecordWriter gtRecordWriter;
-
-
- /**
- * @param queue
- * @param cube
- * @param dictionaryMap
- * @param gtRecordWriter
- */
- public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
- if (dictionaryMap == null || dictionaryMap.isEmpty()) {
- throw new IllegalArgumentException("dictionary cannot be empty");
- }
- this.queue = queue;
- this.desc = cube.getDescriptor();
- this.cuboidScheduler = new CuboidScheduler(desc);
- this.dictionaryMap = dictionaryMap;
- this.gtRecordWriter = gtRecordWriter;
- this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
- this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- this.measureCodec = new MeasureCodec(desc.getMeasures());
-
- Map<String, Integer> measureIndexMap = Maps.newHashMap();
- List<String> metricsAggrFuncsList = Lists.newArrayList();
- measureCount = desc.getMeasures().size();
-
- List<MeasureDesc> measureDescsList = Lists.newArrayList();
- hbaseMeasureRefIndex = new int[measureCount];
- int measureRef = 0;
- for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- for (int j = 0; j < measureCount; j++) {
- if (desc.getMeasures().get(j).equals(measure)) {
- measureDescsList.add(measure);
- hbaseMeasureRefIndex[measureRef] = j;
- break;
- }
- }
- measureRef++;
- }
- }
- }
-
- for (int i = 0; i < measureCount; i++) {
- MeasureDesc measureDesc = measureDescsList.get(i);
- metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
- measureIndexMap.put(measureDesc.getName(), i);
- }
- this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
- this.dependentMeasures = Maps.newHashMap();
- for (int i = 0; i < measureCount; i++) {
- String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
- if (depMsrRef != null) {
- int index = measureIndexMap.get(depMsrRef);
- dependentMeasures.put(i, index);
- }
- }
-
- this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
- }
-
-
- private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
- GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
- GTComboStore store = new GTComboStore(info, memStore);
- GridTable gridTable = new GridTable(info, store);
- return gridTable;
- }
-
- private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
- Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
- BitSet parentDimensions = columnBitSets.getFirst();
- BitSet measureColumns = columnBitSets.getSecond();
- BitSet childDimensions = (BitSet) parentDimensions.clone();
-
- long mask = Long.highestOneBit(parentCuboidId);
- long childCuboidId = cuboidId;
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
- int index = 0;
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parentCuboidId) > 0) {
- if ((mask & childCuboidId) == 0) {
- // this dim will be aggregated
- childDimensions.set(index, false);
- }
- index++;
- }
- mask = mask >> 1;
- }
-
- return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
-
- }
-
- private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
- IGTScanner scanner = gridTable.scan(req);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
- GTBuilder builder = newGridTable.rebuild();
-
- BitSet allNeededColumns = new BitSet();
- allNeededColumns.or(aggregationColumns);
- allNeededColumns.or(measureColumns);
-
- GTRecord newRecord = new GTRecord(newGridTable.getInfo());
- int counter = 0;
- ByteArray byteArray = new ByteArray(8);
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
- try {
- BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
- for (Integer i : dependentMeasures.keySet()) {
- dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
- }
-
- Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
- for (GTRecord record : scanner) {
- counter++;
- for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
- newRecord.set(index, record.get(i));
- }
-
- if(dependentMeasures.size() > 0) {
- // update measures which have 'dependent_measure_ref'
- newRecord.getValues(dependentMetrics, hllObjects);
-
- for (Integer i : dependentMeasures.keySet()) {
- for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
- if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
- assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
- byteBuffer.clear();
- BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
- byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
- newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
- }
- }
-
- }
- }
-
- builder.write(newRecord);
- }
- } finally {
- builder.close();
- }
- logger.info("Cuboid " + cuboidId + " has rows: " + counter);
-
- return newGridTable;
- }
-
- private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- BitSet dimension = new BitSet();
- dimension.set(0, bitSet.cardinality());
- BitSet metrics = new BitSet();
- metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
- return new Pair<BitSet, BitSet>(dimension, metrics);
- }
-
- private Object[] buildKey(List<String> row) {
- int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
- Object[] key = new Object[keySize];
-
- for (int i = 0; i < keySize; i++) {
- key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
- }
-
- return key;
- }
-
- private Object[] buildValue(List<String> row) {
-
- Object[] values = new Object[measureCount];
- MeasureDesc measureDesc = null;
-
- for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
- int i = hbaseMeasureRefIndex[position];
- measureDesc = measureDescs[i];
-
- Object value = null;
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
- FunctionDesc function = desc.getMeasures().get(i).getFunction();
- if (function.isCount() || function.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- value = ONE;
- } else if (flatTableIdx == null) {
- value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
- } else if (flatTableIdx.length == 1) {
- value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
- } else {
-
- byte[] result = null;
- for (int x = 0; x < flatTableIdx.length; x++) {
- byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
- if (result == null) {
- result = Arrays.copyOf(split, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- value = measureCodec.getSerializer(i).valueOf(result);
- }
- values[position] = value;
- }
- return values;
- }
-
-
- @Override
- public void run() {
- try {
- logger.info("Create base cuboid " + baseCuboidId);
- final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
-
- GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
- final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
-
- IGTScanner queueScanner = new IGTScanner() {
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- List<String> currentObject = null;
-
- @Override
- public boolean hasNext() {
- try {
- currentObject = queue.take();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return currentObject != null && currentObject.size() > 0;
- }
-
- @Override
- public GTRecord next() {
- if (currentObject.size() == 0)
- throw new IllegalStateException();
-
- buildGTRecord(currentObject, baseGTRecord);
- return baseGTRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public GTInfo getInfo() {
- return baseCuboidGT.getInfo();
- }
-
- @Override
- public int getScannedRowCount() {
- return 0;
- }
-
- @Override
- public int getScannedRowBlockCount() {
- return 0;
- }
- };
-
- Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
- GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
- IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
-
- int counter = 0;
- for (GTRecord r : aggregationScanner) {
- baseGTBuilder.write(r);
- counter++;
- }
- baseGTBuilder.close();
- aggregationScanner.close();
-
- logger.info("Base cuboid has " + counter + " rows;");
- SimpleGridTableTree tree = new SimpleGridTableTree();
- tree.data = baseCuboidGT;
- tree.id = baseCuboidId;
- tree.parent = null;
- if (counter > 0) {
- List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
- Collections.sort(children);
- for (Long childId : children) {
- createNDCuboidGT(tree, baseCuboidId, childId);
- }
- }
- outputGT(baseCuboidId, baseCuboidGT);
- dropStore(baseCuboidGT);
-
- } catch (IOException e) {
- logger.error("Fail to build cube", e);
- throw new RuntimeException(e);
- }
-
- }
-
- private void buildGTRecord(List<String> row, GTRecord record) {
-
- Object[] dimensions = buildKey(row);
- Object[] metricsValues = buildValue(row);
- Object[] recordValues = new Object[dimensions.length + metricsValues.length];
- System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
- System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
- record.setValues(recordValues);
- }
-
- private boolean gc(TreeNode<GridTable> parentNode) {
- final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
- logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
- for (TreeNode<GridTable> gridTable : gridTables) {
- final GTComboStore store = (GTComboStore) gridTable.data.getStore();
- if (store.memoryUsage() > 0) {
- logger.info("cuboid id:" + gridTable.id + " flush to disk");
- long t = System.currentTimeMillis();
- store.switchToDiskStore();
- logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
- waitForGc();
- return true;
- }
- }
- logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
- return false;
-
- }
-
- private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
- final ExecutorService executorService = Executors.newSingleThreadExecutor();
- final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
- @Override
- public GridTable call() throws Exception {
- return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
- }
- });
- try {
- final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- return gridTable;
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof OutOfMemoryError) {
- logger.warn("Future.get() OutOfMemory, stop the thread");
- } else {
- throw new RuntimeException("this should not happen", e);
- }
- } catch (TimeoutException e) {
- logger.warn("Future.get() timeout, stop the thread");
- }
- logger.info("shutdown executor service");
- final List<Runnable> runnables = executorService.shutdownNow();
- try {
- executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- waitForGc();
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
- }
- return null;
-
- }
-
- private void waitForGc() {
- System.gc();
- logger.info("wait 5 seconds for gc");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException("should not happen", e);
- }
- }
-
- private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
-
- long startTime = System.currentTimeMillis();
-
-// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-// if (parentStore.memoryUsage() <= 0) {
-// long t = System.currentTimeMillis();
-// parentStore.switchToMemStore();
-// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-// }
-
- GridTable currentCuboid;
- while (true) {
- logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
- currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
- if (currentCuboid != null) {
- break;
- } else {
- logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
- if (gc(parentNode)) {
- continue;
- } else {
- logger.warn("all parent node has been flushed into disk, memory is still insufficient");
- throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
- }
- }
- }
- SimpleGridTableTree node = new SimpleGridTableTree();
- node.parent = parentNode;
- node.data = currentCuboid;
- node.id = cuboidId;
- parentNode.children.add(node);
-
- logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
-
- List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
- if (!children.isEmpty()) {
- Collections.sort(children); // sort cuboids
- for (Long childId : children) {
- createNDCuboidGT(node, cuboidId, childId);
- }
- }
-
-
- //output the grid table
- outputGT(cuboidId, currentCuboid);
- dropStore(currentCuboid);
- parentNode.children.remove(node);
- if (parentNode.children.size() > 0) {
- logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
- ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
- }
- }
-
- private void dropStore(GridTable gt) throws IOException {
- ((GTComboStore) gt.getStore()).drop();
- }
-
-
- private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- this.gtRecordWriter.write(cuboidId, record);
- }
- logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
- }
-
- private static class TreeNode<T> {
- T data;
- long id;
- TreeNode<T> parent;
- List<TreeNode<T>> children = Lists.newArrayList();
-
- List<TreeNode<T>> getAncestorList() {
- ArrayList<TreeNode<T>> result = Lists.newArrayList();
- TreeNode<T> parent = this;
- while (parent != null) {
- result.add(parent);
- parent = parent.parent;
- }
- return Lists.reverse(result);
- }
-
- @Override
- public String toString() {
- return id + "";
- }
- }
-
- private static class SimpleGridTableTree extends TreeNode<GridTable> {
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
deleted file mode 100644
index b530cdc..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.apache.kylin.streaming.cube;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
-
- private static final int BENCHMARK_RECORD_LIMIT = 2000000;
- private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
- final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
- final CubeDesc desc = cubeSegment.getCubeDesc();
- for (DimensionDesc dim : desc.getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (desc.getRowkey().isUseDictionary(col)) {
- Dictionary dict = cubeSegment.getDictionary(col);
- if (dict == null) {
- throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
- }
- logger.info("Dictionary for " + col + " was put into dictionary map.");
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
- }
- }
- return dictionaryMap;
- }
-
- private static class ConsoleGTRecordWriter implements IGTRecordWriter {
-
- boolean verbose = false;
-
- @Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
- if (verbose)
- System.out.println(record.toString());
- }
- }
-
- private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
- BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
- String line;
- int counter = 0;
- while ((line = br.readLine()) != null) {
- queue.put(Arrays.asList(line.split("\t")));
- counter++;
- if (counter == BENCHMARK_RECORD_LIMIT) {
- break;
- }
- }
- queue.put(Collections.emptyList());
- }
-
- private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
- queue.put(Collections.emptyList());
- }
-
-
- @Test
- public void test() throws Exception {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
- final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
- final CubeSegment cubeSegment = cube.getFirstSegment();
-
- LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
-
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cubeBuilder);
- loadDataFromLocalFile(queue);
- future.get();
- logger.info("stream build finished");
- }
-}