You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/20 14:44:17 UTC
incubator-kylin git commit: KYLIN-803 Enable InMemCubeBuilder build
multiple times
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 01b89861e -> ed87c6f37
KYLIN-803 Enable InMemCubeBuilder build multiple times
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ed87c6f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ed87c6f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ed87c6f3
Branch: refs/heads/0.8
Commit: ed87c6f37fccd824ee61dae11529806dba25c1a9
Parents: 01b8986
Author: Yang Li <li...@apache.org>
Authored: Sat Jun 20 20:43:48 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Jun 20 20:43:48 2015 +0800
----------------------------------------------------------------------
.../kylin/common/util/ImmutableBitSet.java | 2 +
.../job/hadoop/cubev2/InMemCuboidMapper.java | 4 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 76 ++++++
.../job/inmemcubing/DoggedCubeBuilder.java | 237 +++++++++++++++++++
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 95 ++++----
.../kylin/job/streaming/CubeStreamConsumer.java | 4 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 29 ++-
7 files changed, 385 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index 807a03d..f13cd1d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -39,10 +39,12 @@ public class ImmutableBitSet {
}
}
+ /** return number of true bits */
public int trueBitCount() {
return arr.length;
}
+ /** return the i-th true bit index */
public int trueBitAt(int i) {
return arr[i];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index fdb4823..4f6f295 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -75,9 +75,9 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
}
}
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(cubeBuilder);
+ future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
new file mode 100644
index 0000000..034c4cd
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * An interface alike abstract class. Hold common tunable parameters and nothing more.
+ */
+abstract public class AbstractInMemCubeBuilder {
+
+ final protected CubeDesc cubeDesc;
+ final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ protected int taskThreadCount = 4;
+ protected boolean outputOrderRequired = false;
+ protected int reserveMemoryMB = 100;
+
+ public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ if(cubeDesc == null)
+ throw new NullPointerException();
+ if (dictionaryMap == null || dictionaryMap.isEmpty())
+ throw new IllegalArgumentException("dictionary cannot be empty");
+
+ this.cubeDesc = cubeDesc;
+ this.dictionaryMap = dictionaryMap;
+ }
+
+ public void setConcurrentThreads(int n) {
+ this.taskThreadCount = n;
+ }
+
+ public void setOutputOrder(boolean required) {
+ this.outputOrderRequired = required;
+ }
+
+ public void setReserveMemoryMB(int mb) {
+ this.reserveMemoryMB = mb;
+ }
+
+ public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ build(input, output);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
new file mode 100644
index 0000000..02f24f7
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
+ */
+public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
+
+ public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ super(cubeDesc, dictionaryMap);
+ }
+
+ @Override
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ new BuildOnce().build(input, output);
+ }
+
+ private class BuildOnce {
+
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ List<SplitThread> splits = new ArrayList<SplitThread>();
+ Merger merger = new Merger();
+ SplitThread last = null;
+ boolean eof = false;
+
+ while (!eof) {
+
+ if (last != null && shouldCutSplit()) {
+ cutSplit(last);
+ last = null;
+ }
+
+ checkException(splits);
+
+ if (last == null) {
+ last = new SplitThread(merger.newMergeSlot());
+ splits.add(last);
+ last.start();
+ }
+
+ eof = feedSomeInput(input, last, 1000);
+ }
+
+ merger.mergeAndOutput(splits, output);
+
+ checkException(splits);
+ }
+
+ private void checkException(List<SplitThread> splits) throws IOException {
+ for (int i = 0; i < splits.size(); i++) {
+ SplitThread split = splits.get(i);
+ if (split.exception != null)
+ abort(splits);
+ }
+ }
+
+ private void abort(List<SplitThread> splits) throws IOException {
+ for (SplitThread split : splits) {
+ split.builder.abort();
+ }
+
+ ArrayList<Throwable> errors = new ArrayList<Throwable>();
+ for (SplitThread split : splits) {
+ try {
+ split.join();
+ } catch (InterruptedException e) {
+ errors.add(e);
+ }
+ if (split.exception != null)
+ errors.add(split.exception);
+ }
+
+ if (errors.isEmpty()) {
+ return;
+ } else if (errors.size() == 1) {
+ Throwable t = errors.get(0);
+ if (t instanceof IOException)
+ throw (IOException) t;
+ else
+ throw new IOException(t);
+ } else {
+ for (Throwable t : errors)
+ logger.error("Exception during in-mem cube build", t);
+ throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+ }
+
+ }
+
+ private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
+ try {
+ int i = 0;
+ while (i < n) {
+ List<String> record = input.take();
+ i++;
+
+ while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
+ if (split.exception != null)
+ return true; // got some error
+ }
+
+ if (record == null || record.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void cutSplit(SplitThread last) {
+ try {
+ // signal the end of input
+ while (last.isAlive()) {
+ if (last.inputQueue.offer(Collections.<String> emptyList())) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // wait cuboid build done (but still pending output)
+ while (last.isAlive()) {
+ if (last.builder.isAllCuboidDone()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean shouldCutSplit() {
+ return MemoryBudgetController.getSystemAvailMB() <= reserveMemoryMB;
+ }
+ }
+
+ private class SplitThread extends Thread {
+ final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(64);
+ final BlockingQueue<Pair<Long, GTRecord>> outputQueue = new ArrayBlockingQueue<Pair<Long, GTRecord>>(64);
+ final InMemCubeBuilder builder;
+ final MergeSlot output;
+
+ RuntimeException exception;
+
+ public SplitThread(MergeSlot output) {
+ this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
+ this.builder.setConcurrentThreads(taskThreadCount);
+ this.builder.setOutputOrder(true); // sort merge requires order
+ this.builder.setReserveMemoryMB(reserveMemoryMB);
+
+ this.output = output;
+ }
+
+ @Override
+ public void run() {
+ try {
+ builder.build(inputQueue, output);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ this.exception = (RuntimeException) e;
+ else
+ this.exception = new RuntimeException(e);
+ }
+ }
+ }
+
+ private class Merger {
+
+ public MergeSlot newMergeSlot() {
+ return new MergeSlot();
+ }
+
+ public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) {
+ // TODO
+ }
+ }
+
+ private static class MergeSlot implements ICuboidWriter {
+
+ BlockingQueue<MergeSlot> queue = new ArrayBlockingQueue<MergeSlot>(1);
+ long cuboidId;
+ GTRecord record;
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ this.cuboidId = cuboidId;
+ this.record = record;
+
+ try {
+ // deliver the record
+ queue.put(this);
+
+ // confirm merger consumed (took) the record
+ queue.put(this);
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index b9c48aa..d9a0b8f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kylin.job.inmemcubing;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -48,19 +49,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
*/
-public class InMemCubeBuilder implements Runnable {
+public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
private static final LongWritable ONE = new LongWritable(1l);
- private final BlockingQueue<List<String>> inputQueue;
- private final ICuboidWriter outputWriter;
-
- private final CubeDesc cubeDesc;
private final long baseCuboidId;
private final CuboidScheduler cuboidScheduler;
- private final Map<TblColRef, Dictionary<?>> dictionaryMap;
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
@@ -70,7 +68,6 @@ public class InMemCubeBuilder implements Runnable {
private final int measureCount;
private MemoryBudgetController memBudget;
- private int taskThreadCount = 4;
private Thread[] taskThreads;
private Throwable[] taskThreadExceptions;
private TreeSet<CuboidTask> taskPending;
@@ -78,19 +75,12 @@ public class InMemCubeBuilder implements Runnable {
private OutputThread outputThread;
private int outputCuboidExpected;
- private boolean outputOrderRequired;
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
- public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
- if (dictionaryMap == null || dictionaryMap.isEmpty()) {
- throw new IllegalArgumentException("dictionary cannot be empty");
- }
- this.inputQueue = queue;
- this.cubeDesc = cubeDesc;
+ public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ super(cubeDesc, dictionaryMap);
this.cuboidScheduler = new CuboidScheduler(cubeDesc);
- this.dictionaryMap = dictionaryMap;
- this.outputWriter = gtRecordWriter;
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
@@ -136,18 +126,10 @@ public class InMemCubeBuilder implements Runnable {
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
}
- public void setConcurrentThreads(int n) {
- this.taskThreadCount = n;
- }
-
- public void setOutputOrder(boolean required) {
- this.outputOrderRequired = required;
- }
-
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
- // Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+ // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
// MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
// MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
ConcurrentDiskStore store = new ConcurrentDiskStore(info);
@@ -166,17 +148,7 @@ public class InMemCubeBuilder implements Runnable {
}
@Override
- public void run() {
- try {
- build();
- } catch (IOException e) {
- logger.error("Fail to build cube", e);
- throw new RuntimeException(e);
- }
-
- }
-
- public void build() throws IOException {
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
@@ -187,11 +159,12 @@ public class InMemCubeBuilder implements Runnable {
taskThreadExceptions = new Throwable[taskThreadCount];
// output goes in a separate thread to leverage any async-ness
- outputThread = new OutputThread();
+ outputThread = new OutputThread(output);
outputCuboidExpected = outputThread.getOutputCuboidExpected();
// build base cuboid
- baseResult = createBaseCuboid();
+ totalSumForSanityCheck = null;
+ baseResult = createBaseCuboid(input);
taskCuboidCompleted.incrementAndGet();
if (baseResult.nRows == 0)
return;
@@ -213,11 +186,21 @@ public class InMemCubeBuilder implements Runnable {
throwExceptionIfAny();
}
+
+ public void abort() {
+ interrupt(taskThreads);
+ interrupt(outputThread);
+ }
private void start(Thread... threads) {
for (Thread t : threads)
t.start();
}
+
+ private void interrupt(Thread... threads) {
+ for (Thread t : threads)
+ t.interrupt();
+ }
private void join(Thread... threads) throws IOException {
try {
@@ -261,6 +244,10 @@ public class InMemCubeBuilder implements Runnable {
return result;
}
+ public boolean isAllCuboidDone() {
+ return taskCuboidCompleted.get() == outputCuboidExpected;
+ }
+
private class CuboidTaskThread extends Thread {
private int id;
@@ -272,7 +259,7 @@ public class InMemCubeBuilder implements Runnable {
@Override
public void run() {
try {
- while (taskCuboidCompleted.get() < outputCuboidExpected) {
+ while (!isAllCuboidDone()) {
CuboidTask task = null;
synchronized (taskPending) {
while (task == null && taskHasNoException()) {
@@ -291,7 +278,7 @@ public class InMemCubeBuilder implements Runnable {
task.parent.markOneSpanningDone();
taskCuboidCompleted.incrementAndGet();
- if (taskCuboidCompleted.get() == outputCuboidExpected) {
+ if (isAllCuboidDone()) {
for (Thread t : taskThreads) {
if (t != Thread.currentThread())
t.interrupt();
@@ -299,7 +286,7 @@ public class InMemCubeBuilder implements Runnable {
}
}
} catch (Throwable ex) {
- if (taskCuboidCompleted.get() < outputCuboidExpected) {
+ if (!isAllCuboidDone()) {
logger.error("task thread exception", ex);
taskThreadExceptions[id] = ex;
}
@@ -339,7 +326,7 @@ public class InMemCubeBuilder implements Runnable {
private void makeMemoryBudget() {
int systemAvailMB = getSystemAvailMB();
logger.info("System avail " + systemAvailMB + " MB");
- int reserve = Math.max(100, baseResult.aggrCacheMB / 3);
+ int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3);
logger.info("Reserve " + reserve + " MB for system basics");
int budget = systemAvailMB - reserve;
@@ -350,15 +337,13 @@ public class InMemCubeBuilder implements Runnable {
}
logger.info("Memory Budget is " + budget + " MB");
- if (budget > 0) {
- memBudget = new MemoryBudgetController(budget);
- }
+ memBudget = new MemoryBudgetController(budget);
}
- private CuboidResult createBaseCuboid() throws IOException {
+ private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
GTBuilder baseBuilder = baseCuboid.rebuild();
- IGTScanner baseInput = new InputConverter(baseCuboid.getInfo());
+ IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
int mbBefore = getSystemAvailMB();
int mbAfter = 0;
@@ -384,7 +369,7 @@ public class InMemCubeBuilder implements Runnable {
long timeSpent = System.currentTimeMillis() - startTime;
logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
- int mbBaseAggrCacheOnHeap = mbBefore - mbAfter;
+ int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least
@@ -503,6 +488,7 @@ public class InMemCubeBuilder implements Runnable {
builder.write(newRecord);
}
+ // disable sanity check for performance
sanityCheck(scanner.getTotalSumForSanityCheck());
} finally {
scanner.close();
@@ -515,6 +501,7 @@ public class InMemCubeBuilder implements Runnable {
return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
}
+ //@SuppressWarnings("unused")
private void sanityCheck(Object[] totalSum) {
// double sum introduces error and causes result not exactly equal
for (int i = 0; i < totalSum.length; i++) {
@@ -599,14 +586,16 @@ public class InMemCubeBuilder implements Runnable {
// ============================================================================
private class OutputThread extends Thread {
+ private ICuboidWriter output;
private SortedMap<Long, Long> outputSequence; // synchronized sorted map
private LinkedBlockingDeque<CuboidResult> outputPending;
private int outputCount;
private int outputCuboidExpected;
private Throwable outputThreadException;
- OutputThread() {
+ OutputThread(ICuboidWriter output) {
super("CuboidOutput");
+ this.output = output;
this.outputSequence = prepareOutputSequence();
this.outputPending = new LinkedBlockingDeque<CuboidResult>();
this.outputCount = 0;
@@ -696,7 +685,7 @@ public class InMemCubeBuilder implements Runnable {
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
IGTScanner scanner = gridTable.scan(req);
for (GTRecord record : scanner) {
- outputWriter.write(cuboidId, record);
+ output.write(cuboidId, record);
}
scanner.close();
logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
@@ -720,9 +709,11 @@ public class InMemCubeBuilder implements Runnable {
private class InputConverter implements IGTScanner {
GTInfo info;
GTRecord record;
+ BlockingQueue<List<String>> input;
- public InputConverter(GTInfo info) {
+ public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
this.info = info;
+ this.input = input;
this.record = new GTRecord(info);
}
@@ -735,7 +726,7 @@ public class InMemCubeBuilder implements Runnable {
@Override
public boolean hasNext() {
try {
- currentObject = inputQueue.take();
+ currentObject = input.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 98435cb..0508a4e 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -111,9 +111,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
final HTableInterface hTable = createHTable(cubeSegment);
final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(), dictionaryMap, gtRecordWriter);
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
- executorService.submit(inMemCubeBuilder).get();
+ executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
gtRecordWriter.flush();
commitSegment(cubeSegment);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 63e303d..34e37f2 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -77,22 +77,39 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
public void test() throws Exception {
final int inputRows = 70000;
final int threads = 4;
-
+
final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
cubeBuilder.setConcurrentThreads(threads);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cubeBuilder);
-
- feedData(cube, flatTable, queue, inputRows);
try {
- future.get();
+ // round 1
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, inputRows);
+ future.get();
+ }
+
+ // round 2, zero input
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, 0);
+ future.get();
+ }
+
+ // round 3
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, inputRows);
+ future.get();
+ }
+
} catch (Exception e) {
logger.error("stream build failed", e);
throw new IOException("Failed to build cube ", e);