You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/12 10:55:40 UTC
[kylin] branch master updated: APACHE-KYLIN-2932: Simplify the
thread model for in-memory cubing
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new bef9e5c APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing
bef9e5c is described below
commit bef9e5c13b48f150145327a2648a5cbbd0d425c2
Author: U-CORP\mingmwang <mi...@D-SHC-00437006.corp.ebay.com>
AuthorDate: Thu Oct 12 17:05:52 2017 +0800
APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../cube/inmemcubing/CompoundCuboidWriter.java | 8 +
.../kylin/cube/inmemcubing/DoggedCubeBuilder.java | 4 +-
.../cube/inmemcubing/ICuboidGTTableWriter.java | 47 +++
.../kylin/cube/inmemcubing/ICuboidWriter.java | 3 +
.../kylin/cube/inmemcubing/InMemCubeBuilder.java | 18 +-
.../apache/kylin/cube/inmemcubing2/CuboidTask.java | 53 +++
.../DefaultCuboidCollectorWithCallBack.java | 53 +++
.../cube/inmemcubing2/DoggedCubeBuilder2.java | 442 +++++++++++++++++++++
.../ICuboidCollectorWithCallBack.java} | 62 ++-
.../ICuboidResultListener.java} | 59 ++-
.../kylin/cube/inmemcubing2/InMemCubeBuilder2.java | 408 +++++++++++++++++++
.../mr/steps/InMemCuboidFromBaseCuboidMapper.java | 25 +-
.../kylin/engine/mr/steps/InMemCuboidMapper.java | 23 +-
.../engine/mr/steps/InMemCuboidMapperBase.java | 30 +-
.../kylin/engine/mr/steps/KVGTRecordWriter.java | 4 +-
.../inmemcubing/ITDoggedCubeBuilderStressTest.java | 6 +
.../cube/inmemcubing/ITDoggedCubeBuilderTest.java | 73 +++-
.../cube/inmemcubing/ITInMemCubeBuilderTest.java | 7 +
19 files changed, 1196 insertions(+), 133 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 193329b..018552c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -808,6 +808,10 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.job.cube-auto-ready-enabled", TRUE));
}
+ public String getCubeInMemBuilderClass() {
+ return getOptional("kylin.job.cube-inmem-builder-class", "org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder");
+ }
+
// ============================================================================
// SOURCE.HIVE
// ============================================================================
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
index c82f418..df77978 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
/**
*/
@@ -39,6 +40,13 @@ public class CompoundCuboidWriter implements ICuboidWriter {
writer.write(cuboidId, record);
}
}
+
+ @Override
+ public void write(long cuboidId, GridTable table) throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.write(cuboidId, table);
+ }
+ }
@Override
public void flush() throws IOException {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 8368051..39dce26 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
@@ -181,7 +181,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
final RecordConsumeBlockingQueueController<?> inputController;
final InMemCubeBuilder builder;
- ConcurrentNavigableMap<Long, CuboidResult> buildResult;
+ NavigableMap<Long, CuboidResult> buildResult;
RuntimeException exception;
public SplitThread(final int num, final RecordConsumeBlockingQueueController<?> inputController) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
new file mode 100755
index 0000000..93a7994
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ICuboidGTTableWriter implements ICuboidWriter{
+
+ private static Logger logger = LoggerFactory.getLogger(ICuboidGTTableWriter.class);
+
+ @Override
+ public void write(long cuboidId, GridTable gridTable) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ write(cuboidId, record);
+ }
+ scanner.close();
+ logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 3f6cb0c..4ae182e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
/**
*/
@@ -28,6 +29,8 @@ public interface ICuboidWriter {
void write(long cuboidId, GTRecord record) throws IOException;
+ void write(long cuboidId, GridTable table) throws IOException;
+
void flush() throws IOException;
void close() throws IOException;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e0bdb20..9661fa8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -19,14 +19,13 @@
package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,6 +46,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.measure.topn.Counter;
import org.apache.kylin.measure.topn.TopNCounter;
import org.apache.kylin.metadata.datatype.DoubleMutable;
@@ -111,7 +111,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
new CubeDimEncMap(cubeDesc, dictionaryMap)
);
- ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+ // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+ // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+ // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+ IGTStore store = new ConcurrentDiskStore(info);
GridTable gridTable = new GridTable(info, store);
return gridTable;
@@ -120,7 +123,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
@Override
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
- ConcurrentNavigableMap<Long, CuboidResult> result = build(
+ NavigableMap<Long, CuboidResult> result = build(
RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input));
try {
for (CuboidResult cuboidResult : result.values()) {
@@ -132,9 +135,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
+ public <T> NavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
throws IOException {
- final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+ final NavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
build(input, new ICuboidCollector() {
@Override
public void collect(CuboidResult cuboidResult) {
@@ -213,7 +216,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private void throwExceptionIfAny() throws IOException {
- ArrayList<Throwable> errors = new ArrayList<>();
+ List<Throwable> errors = Lists.newArrayList();
+
for (int i = 0; i < taskThreadCount; i++) {
Throwable t = taskThreadExceptions[i];
if (t != null)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
new file mode 100755
index 0000000..cf54eb6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+@SuppressWarnings("serial")
+class CuboidTask extends RecursiveTask<CuboidResult> implements Comparable<CuboidTask> {
+ final CuboidResult parent;
+ final long childCuboidId;
+ final InMemCubeBuilder2 cubeBuilder;
+
+ CuboidTask(CuboidResult parent, long childCuboidId, InMemCubeBuilder2 cubeBuilder) {
+ this.parent = parent;
+ this.childCuboidId = childCuboidId;
+ this.cubeBuilder = cubeBuilder;
+ }
+
+ @Override
+ public int compareTo(CuboidTask o) {
+ long comp = this.childCuboidId - o.childCuboidId;
+ return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+ }
+
+ @Override
+ protected CuboidResult compute() {
+ try {
+ return cubeBuilder.buildCuboid(this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
new file mode 100755
index 0000000..d7f738d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultCuboidCollectorWithCallBack implements ICuboidCollectorWithCallBack{
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultCuboidCollectorWithCallBack.class);
+
+ final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+ final ICuboidResultListener listener;
+
+ public DefaultCuboidCollectorWithCallBack(ICuboidResultListener listener){
+ this.listener = listener;
+ }
+
+ @Override
+ public void collectAndNotify(CuboidResult cuboidResult) {
+ logger.info("collecting CuboidResult cuboid id:" + cuboidResult.cuboidId);
+ result.put(cuboidResult.cuboidId, cuboidResult);
+ if (listener != null) {
+ listener.finish(cuboidResult);
+ }
+ }
+
+ @Override
+ public NavigableMap<Long, CuboidResult> getAllResult() {
+ return result;
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
new file mode 100755
index 0000000..4c5da87
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+public class DoggedCubeBuilder2 extends AbstractInMemCubeBuilder {
+ private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder2.class);
+
+ public DoggedCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
+ Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ super(cuboidScheduler, flatDesc, dictionaryMap);
+ }
+
+ @Override
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
+ new BuildOnce().build(input, inputConverterUnit, output);
+ }
+
+ private class BuildOnce {
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
+ final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
+ .getQueueController(inputConverterUnit, input);
+
+ final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
+
+ ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
+ @Override
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
+ return worker;
+ }
+ };
+
+ ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
+ CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
+
+ Stopwatch sw = new Stopwatch();
+ sw.start();
+ logger.info("Dogged Cube Build2 start");
+ try {
+ BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
+ builderPool.execute(task);
+ do {
+ builderList.add(task.getInternalBuilder());
+ //Exception will be thrown here if cube building failure
+ task.join();
+ task = task.nextTask();
+ } while (task != null);
+
+ logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
+ for (final InMemCubeBuilder2 builder : builderList) {
+ builderPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ builder.startBuildFromBaseCuboid();
+ }
+ });
+ }
+ resultWatcher.start();
+ logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
+ } catch (Throwable e) {
+ logger.error("Dogged Cube Build2 error", e);
+ if (e instanceof Error)
+ throw (Error) e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new IOException(e);
+ } finally {
+ output.close();
+ closeGirdTables(builderList);
+ sw.stop();
+ builderPool.shutdownNow();
+ logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
+ logger.info("Dogged Cube Build2 return");
+ }
+ }
+
+ private void closeGirdTables(List<InMemCubeBuilder2> builderList) {
+ for (InMemCubeBuilder2 inMemCubeBuilder : builderList) {
+ for (CuboidResult cuboidResult : inMemCubeBuilder.getResultCollector().getAllResult().values()) {
+ closeGirdTable(cuboidResult.table);
+ }
+ }
+ }
+
+ private void closeGirdTable(GridTable gridTable) {
+ try {
+ gridTable.close();
+ } catch (Throwable e) {
+ logger.error("Error closing grid table " + gridTable, e);
+ }
+ }
+ }
+
+ private class BaseCuboidTask<T> extends RecursiveTask<CuboidResult> {
+ private static final long serialVersionUID = -5408592502260876799L;
+
+ private final int splitSeq;
+ private final ICuboidResultListener resultListener;
+
+ private RecordConsumeBlockingQueueController<T> inputController;
+ private InMemCubeBuilder2 builder;
+
+ private volatile BaseCuboidTask<T> next;
+
+ public BaseCuboidTask(final RecordConsumeBlockingQueueController<T> inputController, int splitSeq,
+ ICuboidResultListener resultListener) {
+ this.inputController = inputController;
+ this.splitSeq = splitSeq;
+ this.resultListener = resultListener;
+ this.builder = new InMemCubeBuilder2(cuboidScheduler, flatDesc, dictionaryMap);
+ builder.setReserveMemoryMB(reserveMemoryMB);
+ builder.setConcurrentThreads(taskThreadCount);
+ logger.info("Split #" + splitSeq + " kickoff");
+ }
+
+ @Override
+ protected CuboidResult compute() {
+ try {
+ CuboidResult baseCuboidResult = builder.buildBaseCuboid(inputController, resultListener);
+ if (!inputController.ifEnd()) {
+ next = new BaseCuboidTask<>(inputController, splitSeq + 1, resultListener);
+ next.fork();
+ }
+ logger.info("Split #" + splitSeq + " finished");
+ return baseCuboidResult;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public InMemCubeBuilder2 getInternalBuilder() {
+ return builder;
+ }
+
+ public BaseCuboidTask<T> nextTask() {
+ return next;
+ }
+ }
+
+ /**
+ * Class response for watch the cube building result, monitor the cube building process and trigger merge actions if required.
+ *
+ */
+ private class CuboidResultWatcher implements ICuboidResultListener {
+ final BlockingQueue<CuboidResult> outputQueue;
+ final Map<Long, List<CuboidResult>> pendingQueue = Maps.newHashMap();
+ final List<InMemCubeBuilder2> builderList;
+ final ICuboidWriter output;
+
+ public CuboidResultWatcher(final List<InMemCubeBuilder2> builderList, final ICuboidWriter output) {
+ this.outputQueue = Queues.newLinkedBlockingQueue();
+ this.builderList = builderList;
+ this.output = output;
+ }
+
+ public void start() throws IOException {
+ SplitMerger merger = new SplitMerger();
+ while (true) {
+ if (!outputQueue.isEmpty()) {
+ List<CuboidResult> splitResultReturned = Lists.newArrayList();
+ outputQueue.drainTo(splitResultReturned);
+ for (CuboidResult splitResult : splitResultReturned) {
+ if (builderList.size() == 1) {
+ merger.mergeAndOutput(Lists.newArrayList(splitResult), output);
+ } else {
+ List<CuboidResult> cuboidResultList = pendingQueue.get(splitResult.cuboidId);
+ if (cuboidResultList == null) {
+ cuboidResultList = Lists.newArrayListWithExpectedSize(builderList.size());
+ cuboidResultList.add(splitResult);
+ pendingQueue.put(splitResult.cuboidId, cuboidResultList);
+ } else {
+ cuboidResultList.add(splitResult);
+ }
+ if (cuboidResultList.size() == builderList.size()) {
+ merger.mergeAndOutput(cuboidResultList, output);
+ pendingQueue.remove(splitResult.cuboidId);
+ }
+ }
+ }
+ }
+
+ boolean jobFinished = isAllBuildFinished();
+ if (outputQueue.isEmpty() && !jobFinished) {
+ boolean ifWait = true;
+ for (InMemCubeBuilder2 builder : builderList) {
+ Queue<CuboidTask> queue = builder.getCompletedTaskQueue();
+ while (queue.size() > 0) {
+ CuboidTask childTask = queue.poll();
+ if (childTask.isCompletedAbnormally()) {
+ throw new RuntimeException(childTask.getException());
+ }
+ ifWait = false;
+ }
+ }
+ if (ifWait) {
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ } else if (outputQueue.isEmpty() && pendingQueue.isEmpty() && jobFinished) {
+ return;
+ }
+ }
+ }
+
+ private boolean isAllBuildFinished() {
+ for (InMemCubeBuilder2 split : builderList) {
+ if (!split.isAllCuboidDone()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void finish(CuboidResult result) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ int nRetries = 0;
+ while (!outputQueue.offer(result)) {
+ nRetries++;
+ long sleepTime = stopwatch.elapsedMillis();
+ if (sleepTime > 3600000L) {
+ stopwatch.stop();
+ throw new RuntimeException(
+ "OutputQueue Full. Cannot offer to the output queue after waiting for one hour!!! Current queue size: "
+ + outputQueue.size());
+ }
+ logger.warn("OutputQueue Full. Queue size: " + outputQueue.size() + ". Total sleep time : " + sleepTime
+ + ", and retry count : " + nRetries);
+ try {
+ Thread.sleep(5000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ stopwatch.stop();
+ }
+ }
+
+ private class SplitMerger {
+ MeasureAggregators reuseAggrs;
+ Object[] reuseMetricsArray;
+ ByteArray reuseMetricsSpace;
+
+ long lastCuboidColumnCount;
+ ImmutableBitSet lastMetricsColumns;
+
+ SplitMerger() {
+ reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+ reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
+ }
+
+ public void mergeAndOutput(List<CuboidResult> splitResultList, ICuboidWriter output) throws IOException {
+ if (splitResultList.size() == 1) {
+ CuboidResult cuboidResult = splitResultList.get(0);
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ return;
+ }
+ LinkedList<ResultMergeSlot> open = Lists.newLinkedList();
+ for (CuboidResult splitResult : splitResultList) {
+ open.add(new ResultMergeSlot(splitResult));
+ }
+
+ PriorityQueue<ResultMergeSlot> heap = new PriorityQueue<ResultMergeSlot>();
+ while (true) {
+ // ready records in open slots and add to heap
+ while (!open.isEmpty()) {
+ ResultMergeSlot slot = open.removeFirst();
+ if (slot.fetchNext()) {
+ heap.add(slot);
+ }
+ }
+
+ // find the smallest on heap
+ ResultMergeSlot smallest = heap.poll();
+ if (smallest == null)
+ break;
+ open.add(smallest);
+
+ // merge with slots having the same key
+ if (smallest.isSameKey(heap.peek())) {
+ Object[] metrics = getMetricsValues(smallest.currentRecord);
+ reuseAggrs.reset();
+ reuseAggrs.aggregate(metrics);
+ do {
+ ResultMergeSlot slot = heap.poll();
+ open.add(slot);
+ metrics = getMetricsValues(slot.currentRecord);
+ reuseAggrs.aggregate(metrics);
+ } while (smallest.isSameKey(heap.peek()));
+
+ reuseAggrs.collectStates(metrics);
+ setMetricsValues(smallest.currentRecord, metrics);
+ }
+ output.write(smallest.currentCuboidId, smallest.currentRecord);
+ }
+ }
+
+ private void setMetricsValues(GTRecord record, Object[] metricsValues) {
+ ImmutableBitSet metrics = getMetricsColumns(record);
+
+ if (reuseMetricsSpace == null) {
+ reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
+ }
+
+ record.setValues(metrics, reuseMetricsSpace, metricsValues);
+ }
+
+ private Object[] getMetricsValues(GTRecord record) {
+ ImmutableBitSet metrics = getMetricsColumns(record);
+ return record.getValues(metrics, reuseMetricsArray);
+ }
+
+ private ImmutableBitSet getMetricsColumns(GTRecord record) {
+ // metrics columns always come after dimension columns
+ if (lastCuboidColumnCount == record.getInfo().getColumnCount())
+ return lastMetricsColumns;
+
+ int to = record.getInfo().getColumnCount();
+ int from = to - reuseMetricsArray.length;
+ lastCuboidColumnCount = record.getInfo().getColumnCount();
+ lastMetricsColumns = new ImmutableBitSet(from, to);
+ return lastMetricsColumns;
+ }
+ }
+
+ private static class ResultMergeSlot implements Comparable<ResultMergeSlot> {
+ CuboidResult splitResult;
+ IGTScanner scanner;
+ Iterator<GTRecord> recordIterator;
+
+ long currentCuboidId;
+ GTRecord currentRecord;
+
+ public ResultMergeSlot(CuboidResult splitResult) {
+ this.splitResult = splitResult;
+ }
+
+ public boolean fetchNext() throws IOException {
+ if (recordIterator == null) {
+ currentCuboidId = splitResult.cuboidId;
+ scanner = splitResult.table.scan(new GTScanRequestBuilder().setInfo(splitResult.table.getInfo())
+ .setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
+ recordIterator = scanner.iterator();
+ }
+
+ if (recordIterator.hasNext()) {
+ currentRecord = recordIterator.next();
+ return true;
+ } else {
+ scanner.close();
+ recordIterator = null;
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(ResultMergeSlot o) {
+ long cuboidComp = this.currentCuboidId - o.currentCuboidId;
+ if (cuboidComp != 0)
+ return cuboidComp < 0 ? -1 : 1;
+
+ // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
+ ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
+ for (int i = 0; i < pk.trueBitCount(); i++) {
+ int c = pk.trueBitAt(i);
+ int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
+ if (comp != 0)
+ return comp;
+ }
+ return 0;
+ }
+
+ public boolean isSameKey(ResultMergeSlot o) {
+ if (o == null)
+ return false;
+ else
+ return this.compareTo(o) == 0;
+ }
+
+ };
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
old mode 100644
new mode 100755
similarity index 72%
copy from core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
copy to core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
index 3f6cb0c..b669bbe
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
@@ -1,34 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.IOException;
-
-import org.apache.kylin.gridtable.GTRecord;
-
-/**
- */
-public interface ICuboidWriter {
-
- void write(long cuboidId, GTRecord record) throws IOException;
-
- void flush() throws IOException;
-
- void close() throws IOException;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidCollectorWithCallBack {
+ void collectAndNotify(CuboidResult result);
+ NavigableMap<Long, CuboidResult> getAllResult();
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
old mode 100644
new mode 100755
similarity index 72%
copy from core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
copy to core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
index 3f6cb0c..6d80f00
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
@@ -1,34 +1,25 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.IOException;
-
-import org.apache.kylin.gridtable.GTRecord;
-
-/**
- */
-public interface ICuboidWriter {
-
- void write(long cuboidId, GTRecord record) throws IOException;
-
- void flush() throws IOException;
-
- void close() throws IOException;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidResultListener {
+ void finish(CuboidResult cuboidResult);
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
new file mode 100755
index 0000000..35a4d09
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderUtils;
+import org.apache.kylin.cube.inmemcubing.InputConverter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
+ */
+public class InMemCubeBuilder2 extends AbstractInMemCubeBuilder {
+ private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder2.class);
+
+ // by experience
+ private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
+ private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
+
+ protected final String[] metricsAggrFuncs;
+ protected final MeasureDesc[] measureDescs;
+ protected final int measureCount;
+
+ private MemoryBudgetController memBudget;
+ protected final long baseCuboidId;
+ private CuboidResult baseResult;
+
+ private Queue<CuboidTask> completedTaskQueue;
+ private AtomicInteger taskCuboidCompleted;
+
+ private ICuboidCollectorWithCallBack resultCollector;
+
+ public InMemCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
+ Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ super(cuboidScheduler, flatDesc, dictionaryMap);
+ this.measureCount = cubeDesc.getMeasures().size();
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ List<String> metricsAggrFuncsList = Lists.newArrayList();
+
+ for (int i = 0; i < measureCount; i++) {
+ MeasureDesc measureDesc = measureDescs[i];
+ metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+ }
+ this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+ this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ }
+
+ public int getBaseResultCacheMB() {
+ return baseResult.aggrCacheMB;
+ }
+
+ private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
+ GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID),
+ new CubeDimEncMap(cubeDesc, dictionaryMap));
+
+ // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+ // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+ // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+ IGTStore store = new ConcurrentDiskStore(info);
+
+ GridTable gridTable = new GridTable(info, store);
+ return gridTable;
+ }
+
+ @Override
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
+ NavigableMap<Long, CuboidResult> result = buildAndCollect(
+ RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input), null);
+ try {
+ for (CuboidResult cuboidResult : result.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ } finally {
+ output.close();
+ }
+ }
+
+ /**
+ * Build all the cuboids and wait for all the tasks finished.
+ *
+ * @param input
+ * @param listener
+ * @return
+ * @throws IOException
+ */
+ private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
+ final ICuboidResultListener listener) throws IOException {
+
+ long startTime = System.currentTimeMillis();
+ logger.info("In Mem Cube Build2 start, " + cubeDesc.getName());
+
+ // build base cuboid
+ buildBaseCuboid(input, listener);
+
+ ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
+ @Override
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
+ return worker;
+ }
+ };
+ ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
+ ForkJoinTask rootTask = builderPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ startBuildFromBaseCuboid();
+ }
+ });
+ rootTask.join();
+
+ long endTime = System.currentTimeMillis();
+ logger.info("In Mem Cube Build2 end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+ logger.info("total CuboidResult count:" + resultCollector.getAllResult().size());
+ return resultCollector.getAllResult();
+ }
+
+ public ICuboidCollectorWithCallBack getResultCollector() {
+ return resultCollector;
+ }
+
+ public <T> CuboidResult buildBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+ final ICuboidResultListener listener) throws IOException {
+ completedTaskQueue = new LinkedBlockingQueue<CuboidTask>();
+ taskCuboidCompleted = new AtomicInteger(0);
+
+ resultCollector = new DefaultCuboidCollectorWithCallBack(listener);
+
+ MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker = new MemoryWaterLevel();
+ baseCuboidMemTracker.markLow();
+ baseResult = createBaseCuboid(input, baseCuboidMemTracker);
+
+ if (baseResult.nRows == 0) {
+ taskCuboidCompleted.set(cuboidScheduler.getCuboidCount());
+ return baseResult;
+ }
+
+ baseCuboidMemTracker.markLow();
+ baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
+
+ makeMemoryBudget();
+ return baseResult;
+ }
+
+ public CuboidResult buildCuboid(CuboidTask task) throws IOException {
+ CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+ completedTaskQueue.add(task);
+ addChildTasks(newCuboid);
+ return newCuboid;
+ }
+
+ private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
+ final String consumerName = "AggrCache@Cuboid " + cuboidId;
+ MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
+ @Override
+ public int freeUp(int mb) {
+ return 0; // cannot free up on demand
+ }
+
+ @Override
+ public String toString() {
+ return consumerName;
+ }
+ };
+
+ // reserve memory for aggregation cache, can't be larger than the parent
+ memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+ try {
+ return aggregateCuboid(parent, cuboidId);
+ } finally {
+ memBudget.reserve(consumer, 0);
+ }
+ }
+
+ public boolean isAllCuboidDone() {
+ return taskCuboidCompleted.get() == cuboidScheduler.getCuboidCount();
+ }
+
+ public void startBuildFromBaseCuboid() {
+ addChildTasks(baseResult);
+ }
+
+ private void addChildTasks(CuboidResult parent) {
+ List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+ if (children != null && !children.isEmpty()) {
+ List<CuboidTask> childTasks = Lists.newArrayListWithExpectedSize(children.size());
+ for (Long child : children) {
+ CuboidTask task = new CuboidTask(parent, child, this);
+ childTasks.add(task);
+ task.fork();
+ }
+ for (CuboidTask childTask : childTasks) {
+ childTask.join();
+ }
+ }
+ }
+
+ public Queue<CuboidTask> getCompletedTaskQueue() {
+ return completedTaskQueue;
+ }
+
+ private void makeMemoryBudget() {
+ int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB();
+ logger.info("System avail " + systemAvailMB + " MB");
+ int reserve = reserveMemoryMB;
+ logger.info("Reserve " + reserve + " MB for system basics");
+
+ int budget = systemAvailMB - reserve;
+ if (budget < baseResult.aggrCacheMB) {
+ // make sure we have base aggr cache as minimal
+ budget = baseResult.aggrCacheMB;
+ logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache ("
+ + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve
+ + " MB), consider increase JVM heap -Xmx");
+ }
+
+ logger.info("Memory Budget is " + budget + " MB");
+ memBudget = new MemoryBudgetController(budget);
+ }
+
+ private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+ MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker) throws IOException {
+ logger.info("Calculating base cuboid " + baseCuboidId);
+
+ Stopwatch sw = new Stopwatch();
+ sw.start();
+ GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+ GTBuilder baseBuilder = baseCuboid.rebuild();
+ IGTScanner baseInput = new InputConverter<>(baseCuboid.getInfo(), input);
+
+ Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils
+ .getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null)
+ .setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond())
+ .setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+ GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+ aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
+
+ int count = 0;
+ for (GTRecord r : aggregationScanner) {
+ if (count == 0) {
+ baseCuboidMemTracker.markHigh();
+ }
+ baseBuilder.write(r);
+ count++;
+ }
+ aggregationScanner.close();
+ baseBuilder.close();
+
+ sw.stop();
+ logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms");
+
+ int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache()
+ / MemoryBudgetController.ONE_MB);
+ logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
+
+ return updateCuboidResult(baseCuboidId, baseCuboid, count, sw.elapsedMillis(), 0,
+ input.inputConverterUnit.ifChange());
+ }
+
+ private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent,
+ int aggrCacheMB) {
+ return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true);
+ }
+
+ private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB,
+ boolean ifCollect) {
+ if (aggrCacheMB <= 0 && baseResult != null) {
+ aggrCacheMB = (int) Math.round(
+ (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) //
+ * baseResult.aggrCacheMB);
+ }
+
+ CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+ taskCuboidCompleted.incrementAndGet();
+
+ if (ifCollect) {
+ resultCollector.collectAndNotify(result);
+ }
+ return result;
+ }
+
+ protected CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
+ final Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = InMemCubeBuilderUtils
+ .getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, measureCount);
+ return scanAndAggregateGridTable(parent.table, newGridTableByCuboidID(cuboidId), parent.cuboidId,
+ cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
+ }
+
+ private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId,
+ ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+ GTInfo info = gridTable.getInfo();
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+ .setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs)
+ .setFilterPushDown(null).createGTScanRequest();
+ GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+
+ // for child cuboid, some measures don't need aggregation.
+ if (parentId != cuboidId) {
+ boolean[] aggrMask = new boolean[measureDescs.length];
+ for (int i = 0; i < measureDescs.length; i++) {
+ aggrMask[i] = !measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
+
+ if (!aggrMask[i]) {
+ logger.info(measureDescs[i].toString() + " doesn't need aggregation.");
+ }
+ }
+ scanner.setAggrMask(aggrMask);
+ }
+
+ return scanner;
+ }
+
+ protected CuboidResult scanAndAggregateGridTable(GridTable gridTable, GridTable newGridTable, long parentId,
+ long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+ Stopwatch sw = new Stopwatch();
+ sw.start();
+ logger.info("Calculating cuboid " + cuboidId);
+
+ GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns,
+ measureColumns);
+ GTBuilder builder = newGridTable.rebuild();
+
+ ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
+
+ GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+ int count = 0;
+ try {
+ for (GTRecord record : scanner) {
+ count++;
+ for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
+ int c = allNeededColumns.trueBitAt(i);
+ newRecord.set(i, record.get(c));
+ }
+ builder.write(newRecord);
+ }
+ } finally {
+ scanner.close();
+ builder.close();
+ }
+ sw.stop();
+ logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms");
+
+ return updateCuboidResult(cuboidId, newGridTable, count, sw.elapsedMillis(), 0);
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
index 1beebc7..fc6edd3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -20,31 +20,21 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class InMemCuboidFromBaseCuboidMapper
extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, ByteArrayWritable, ByteArray> {
@@ -75,16 +65,8 @@ public class InMemCuboidFromBaseCuboidMapper
}
@Override
- protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
- int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
- AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
- cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
- cubeBuilder.setConcurrentThreads(taskThreadCount);
-
- ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build());
- return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
- new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+ protected ICuboidWriter getCuboidWriter(Context context) {
+ return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
}
@Override
@@ -98,5 +80,4 @@ public class InMemCuboidFromBaseCuboidMapper
return new ByteArray(keyValue);
}
-
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 551a17b..d363afc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -19,24 +19,15 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class InMemCuboidMapper<KEYIN>
extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
@@ -63,15 +54,7 @@ public class InMemCuboidMapper<KEYIN>
}
@Override
- protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
- int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
- AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
- cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
- cubeBuilder.setConcurrentThreads(taskThreadCount);
-
- ExecutorService executorService = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
- return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
- new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+ protected ICuboidWriter getCuboidWriter(Context context) {
+ return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index e95ce8a..ce08b5c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -21,10 +21,13 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
@@ -33,7 +36,10 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
@@ -74,11 +80,10 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
protected abstract InputConverterUnit<T> getInputConverterUnit(Context context);
- protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
- int reserveMemoryMB, CuboidScheduler cuboidScheduler);
-
protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
+ protected abstract ICuboidWriter getCuboidWriter(Context context);
+
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
@@ -106,7 +111,24 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
reserveMemoryMB = calculateReserveMB(conf);
inputConverterUnit = getInputConverterUnit(context);
- future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler);
+
+ AbstractInMemCubeBuilder cubeBuilder;
+ try {
+ cubeBuilder = (AbstractInMemCubeBuilder) Class.forName(cubeSegment.getConfig().getCubeInMemBuilderClass())
+ .getConstructor(CuboidScheduler.class, IJoinedFlatTableDesc.class, Map.class)
+ .newInstance(cuboidScheduler, flatDesc, dictionaryMap);
+ } catch (Exception e) {
+ logger.warn("Fail to initialize cube builder by class name "
+ + cubeSegment.getConfig().getCubeInMemBuilderClass() + " due to " + e);
+ cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+ }
+ cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+ cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
+ future = executorService
+ .submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, getCuboidWriter(context)));
}
private int calculateReserveMB(Configuration configuration) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index 60d0870..43b7ee2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.ICuboidGTTableWriter;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
*/
-public abstract class KVGTRecordWriter implements ICuboidWriter {
+public abstract class KVGTRecordWriter extends ICuboidGTTableWriter {
private static final Logger logger = LoggerFactory.getLogger(KVGTRecordWriter.class);
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index be3d759..695455b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -32,6 +32,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -101,6 +102,11 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
}
@Override
+ public void write(long cuboidId, GridTable table) throws IOException {
+
+ }
+
+ @Override
public void flush() {
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 8fcf9ed..6cfec84 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -37,8 +37,11 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing2.DoggedCubeBuilder2;
+import org.apache.kylin.cube.inmemcubing2.InMemCubeBuilder2;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -48,6 +51,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
/**
*/
public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
@@ -89,10 +94,19 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
long randSeed = System.currentTimeMillis();
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+ InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+ inmemBuilder.setConcurrentThreads(THREADS);
+ FileRecordWriter inmemResult = new FileRecordWriter();
+ {
+ Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ inmemResult.close();
+ }
+
DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
doggedBuilder.setConcurrentThreads(THREADS);
FileRecordWriter doggedResult = new FileRecordWriter();
-
{
Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
@@ -100,20 +114,34 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
doggedResult.close();
}
- InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
- inmemBuilder.setConcurrentThreads(THREADS);
- FileRecordWriter inmemResult = new FileRecordWriter();
-
+ InMemCubeBuilder2 inmemBuilder2 = new InMemCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+ inmemBuilder2.setConcurrentThreads(THREADS);
+ FileRecordWriter inmemResult2 = new FileRecordWriter();
{
- Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+ Future<?> future = executorService.submit(inmemBuilder2.buildAsRunnable(queue, inmemResult2));
ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
future.get();
- inmemResult.close();
+ inmemResult2.close();
}
+ DoggedCubeBuilder2 doggedBuilder2 = new DoggedCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+ doggedBuilder2.setConcurrentThreads(THREADS);
+ FileRecordWriter doggedResult2 = new FileRecordWriter();
+ {
+ Future<?> future = executorService.submit(doggedBuilder2.buildAsRunnable(queue, doggedResult2));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
+ future.get();
+ doggedResult2.close();
+ }
+
+ fileCompare(inmemResult.file, inmemResult2.file);
fileCompare(inmemResult.file, doggedResult.file);
- doggedResult.file.delete();
+ fileCompare2(inmemResult.file, doggedResult2.file);
+
inmemResult.file.delete();
+ inmemResult2.file.delete();
+ doggedResult.file.delete();
+ doggedResult2.file.delete();
}
private void fileCompare(File file, File file2) throws IOException {
@@ -133,6 +161,27 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
r2.close();
}
+ private void fileCompare2(File file, File file2) throws IOException {
+ Map<String, Integer> content1 = readContents(file);
+ Map<String, Integer> content2 = readContents(file2);
+ assertEquals(content1, content2);
+ }
+
+ private Map<String, Integer> readContents(File file) throws IOException {
+ BufferedReader r = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+ Map<String, Integer> content = Maps.newHashMap();
+ String line;
+ while ((line = r.readLine()) != null) {
+ Integer cnt = content.get(line);
+ if (cnt == null) {
+ cnt = 0;
+ }
+ content.put(line, cnt + 1);
+ }
+ r.close();
+ return content;
+ }
+
class FileRecordWriter implements ICuboidWriter {
File file;
@@ -152,6 +201,14 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
}
@Override
+ public void write(long cuboidId, GridTable table) throws IOException {
+ writer.print(cuboidId);
+ writer.print(", ");
+ writer.print(table.toString());
+ writer.println();
+ }
+
+ @Override
public void flush() {
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 2a96b39..0353313 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -45,6 +45,7 @@ import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -276,6 +277,12 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
}
@Override
+ public void write(long cuboidId, GridTable table) throws IOException {
+ if (verbose)
+ System.out.println(table.toString());
+ }
+
+ @Override
public void flush() {
if (verbose) {
System.out.println("flush");