You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/25 01:43:41 UTC
incubator-kylin git commit: KYLIN-803 DoggedCubeBuilder that cut
input into splits and do in-mem build one by one
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 fe569ac4f -> e124e8f33
KYLIN-803 DoggedCubeBuilder that cut input into splits and do in-mem build one by one
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e124e8f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e124e8f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e124e8f3
Branch: refs/heads/0.8
Commit: e124e8f335ef1da22826eb49b70c89d444e3e6de
Parents: fe569ac
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 24 10:39:23 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Jun 25 07:40:10 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/cuboid/CuboidScheduler.java | 14 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 28 ++-
.../job/inmemcubing/DoggedCubeBuilder.java | 238 +++++++++---------
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 243 ++++---------------
.../DoggedCubeBuilderStressTest.java | 95 ++++++++
.../job/inmemcubing/DoggedCubeBuilderTest.java | 4 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 2 +-
.../kylin/storage/gridtable/GridTable.java | 10 +-
8 files changed, 313 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 07be092..bebfd08 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -19,8 +19,6 @@
package org.apache.kylin.cube.cuboid;
/**
- * @author George Song (ysong1)
- *
*/
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +40,18 @@ public class CuboidScheduler {
this.max = (long) Math.pow(2, size) - 1;
this.cache = new ConcurrentHashMap<Long, List<Long>>();
}
+
+ public int getCuboidCount() {
+ return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+ }
+
+ private int getCuboidCount(long cuboidId) {
+ int r = 1;
+ for (Long child : getSpanningCuboid(cuboidId)) {
+ r += getCuboidCount(child);
+ }
+ return r;
+ }
public List<Long> getSpanningCuboid(long cuboid) {
if (cuboid > max || cuboid < 0) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
index 034c4cd..0ff7767 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -24,17 +24,24 @@ import java.util.concurrent.BlockingQueue;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An interface alike abstract class. Hold common tunable parameters and nothing more.
*/
abstract public class AbstractInMemCubeBuilder {
+ private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
+
final protected CubeDesc cubeDesc;
final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
protected int taskThreadCount = 4;
- protected boolean outputOrderRequired = false;
protected int reserveMemoryMB = 100;
public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
@@ -51,10 +58,6 @@ abstract public class AbstractInMemCubeBuilder {
this.taskThreadCount = n;
}
- public void setOutputOrder(boolean required) {
- this.outputOrderRequired = required;
- }
-
public void setReserveMemoryMB(int mb) {
this.reserveMemoryMB = mb;
}
@@ -72,5 +75,18 @@ abstract public class AbstractInMemCubeBuilder {
};
}
- abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+ abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+
+ protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ output.write(cuboidId, record);
+ }
+ scanner.close();
+ logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index dc7c695..b0c3d5c 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -19,10 +19,12 @@ package org.apache.kylin.job.inmemcubing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -31,11 +33,14 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
import org.apache.kylin.metadata.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,34 +72,63 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private class BuildOnce {
- final List<SplitThread> splits = new ArrayList<SplitThread>();
- final Merger merger = new Merger();
-
public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
- SplitThread last = null;
- boolean eof = false;
+ final List<SplitThread> splits = new ArrayList<SplitThread>();
+ final Merger merger = new Merger();
+
+ long start = System.currentTimeMillis();
+ logger.info("Dogged Cube Build start");
+
+ try {
+ SplitThread last = null;
+ boolean eof = false;
- while (!eof) {
+ while (!eof) {
- if (last != null && shouldCutSplit()) {
- cutSplit(last);
- last = null;
+ if (last != null && shouldCutSplit(splits)) {
+ cutSplit(last);
+ last = null;
+ }
+
+ checkException(splits);
+
+ if (last == null) {
+ last = new SplitThread();
+ splits.add(last);
+ last.start();
+ }
+
+ eof = feedSomeInput(input, last, unitRows);
}
+ for (SplitThread split : splits) {
+ split.join();
+ }
checkException(splits);
+ logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
- if (last == null) {
- last = new SplitThread(merger);
- splits.add(last);
- last.start();
- }
+ merger.mergeAndOutput(splits, output);
- eof = feedSomeInput(input, last, unitRows);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ closeGirdTables(splits);
+ logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
}
+ }
- merger.mergeAndOutput(splits, output);
-
- checkException(splits);
+ private void closeGirdTables(List<SplitThread> splits) {
+ for (SplitThread split : splits) {
+ if (split.buildResult != null) {
+ for (CuboidResult r : split.buildResult.values()) {
+ try {
+ r.table.close();
+ } catch (Throwable e) {
+ logger.error("Error closing grid table " + r.table, e);
+ }
+ }
+ }
+ }
}
private void checkException(List<SplitThread> splits) throws IOException {
@@ -171,7 +205,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
Thread.sleep(1000);
}
- // wait cuboid build done (but still pending output)
+ // wait cuboid build done
while (last.isAlive()) {
if (last.builder.isAllCuboidDone()) {
break;
@@ -183,7 +217,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private boolean shouldCutSplit() {
+ private boolean shouldCutSplit(List<SplitThread> splits) {
int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
int nSplit = splits.size();
long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
@@ -197,24 +231,21 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private class SplitThread extends Thread {
final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
final InMemCubeBuilder builder;
- final MergeSlot output;
+ TreeMap<Long, CuboidResult> buildResult;
long inputRowCount = 0;
RuntimeException exception;
- public SplitThread(Merger merger) {
+ public SplitThread() {
this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
this.builder.setConcurrentThreads(taskThreadCount);
- this.builder.setOutputOrder(true); // merge sort requires order
this.builder.setReserveMemoryMB(reserveMemoryMB);
-
- this.output = merger.newMergeSlot(this);
}
@Override
public void run() {
try {
- builder.build(inputQueue, output);
+ buildResult = builder.build(inputQueue);
} catch (Exception e) {
if (e instanceof RuntimeException)
this.exception = (RuntimeException) e;
@@ -239,74 +270,55 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
reuseMetricsArray = new Object[measures.length];
}
- public MergeSlot newMergeSlot(SplitThread split) {
- return new MergeSlot(split);
- }
-
public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
- LinkedList<MergeSlot> open = Lists.newLinkedList();
- for (SplitThread split : splits)
- open.add(split.output);
-
if (splits.size() == 1) {
- splits.get(0).output.directOutput = output;
- }
-
- try {
- PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
- boolean hasMore = true;
-
- while (hasMore) {
- takeRecordsFromAllOpenSlots(open, heap);
- hasMore = mergeAndOutputOneRecord(heap, open, output);
+ for (CuboidResult cuboidResult : splits.get(0).buildResult.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
}
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ return;
}
- }
- private void takeRecordsFromAllOpenSlots(LinkedList<MergeSlot> open, PriorityQueue<MergeSlot> heap) throws InterruptedException {
- while (!open.isEmpty()) {
- MergeSlot slot = open.getFirst();
- // ready one record in the slot
- if (slot.readySignal.poll(1, TimeUnit.SECONDS) != null) {
- open.removeFirst();
- heap.add(slot);
- } else if (slot.isClosed()) {
- open.removeFirst();
- }
+ LinkedList<MergeSlot> open = Lists.newLinkedList();
+ for (SplitThread split : splits) {
+ open.add(new MergeSlot(split));
}
- return;
- }
- private boolean mergeAndOutputOneRecord(PriorityQueue<MergeSlot> heap, LinkedList<MergeSlot> open, ICuboidWriter output) throws IOException, InterruptedException {
- MergeSlot smallest = heap.poll();
- if (smallest == null)
- return false;
- open.add(smallest);
-
- if (smallest.isSameKey(heap.peek())) {
- Object[] metrics = getMetricsValues(smallest.record);
- reuseAggrs.reset();
- reuseAggrs.aggregate(metrics);
- do {
- MergeSlot slot = heap.poll();
- open.add(slot);
- metrics = getMetricsValues(slot.record);
- reuseAggrs.aggregate(metrics);
- } while (smallest.isSameKey(heap.peek()));
+ PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
- reuseAggrs.collectStates(metrics);
- setMetricsValues(smallest.record, metrics);
- }
+ while (true) {
+ // ready records in open slots and add to heap
+ while (!open.isEmpty()) {
+ MergeSlot slot = open.removeFirst();
+ if (slot.fetchNext()) {
+ heap.add(slot);
+ }
+ }
+
+ // find the smallest on heap
+ MergeSlot smallest = heap.poll();
+ if (smallest == null)
+ break;
+ open.add(smallest);
- output.write(smallest.cuboidId, smallest.record);
+ // merge with slots having the same key
+ if (smallest.isSameKey(heap.peek())) {
+ Object[] metrics = getMetricsValues(smallest.currentRecord);
+ reuseAggrs.reset();
+ reuseAggrs.aggregate(metrics);
+ do {
+ MergeSlot slot = heap.poll();
+ open.add(slot);
+ metrics = getMetricsValues(slot.currentRecord);
+ reuseAggrs.aggregate(metrics);
+ } while (smallest.isSameKey(heap.peek()));
+
+ reuseAggrs.collectStates(metrics);
+ setMetricsValues(smallest.currentRecord, metrics);
+ }
- for (MergeSlot slot : open) {
- slot.consumedSignal.put(this);
+ output.write(smallest.currentCuboidId, smallest.currentRecord);
}
- return true;
}
private void setMetricsValues(GTRecord record, Object[] metricsValues) {
@@ -337,58 +349,52 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private static class MergeSlot implements ICuboidWriter, Comparable<MergeSlot> {
+ private static class MergeSlot implements Comparable<MergeSlot> {
- final SplitThread split;
- final BlockingQueue<Object> readySignal = new ArrayBlockingQueue<Object>(1);
- final BlockingQueue<Object> consumedSignal = new ArrayBlockingQueue<Object>(1);
+ final Iterator<CuboidResult> cuboidIterator;
+ IGTScanner scanner;
+ Iterator<GTRecord> recordIterator;
- ICuboidWriter directOutput = null;
- long cuboidId;
- GTRecord record;
+ long currentCuboidId;
+ GTRecord currentRecord;
public MergeSlot(SplitThread split) {
- this.split = split;
+ cuboidIterator = split.buildResult.values().iterator();
}
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- // when only one split left
- if (directOutput != null) {
- directOutput.write(cuboidId, record);
- return;
+ public boolean fetchNext() throws IOException {
+ if (recordIterator == null) {
+ if (cuboidIterator.hasNext()) {
+ CuboidResult cuboid = cuboidIterator.next();
+ currentCuboidId = cuboid.cuboidId;
+ scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+ recordIterator = scanner.iterator();
+ } else {
+ return false;
+ }
}
- this.cuboidId = cuboidId;
- this.record = record;
-
- try {
- // signal record is ready
- readySignal.put(this);
-
- // wait record be consumed
- consumedSignal.take();
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ if (recordIterator.hasNext()) {
+ currentRecord = recordIterator.next();
+ return true;
+ } else {
+ scanner.close();
+ recordIterator = null;
+ return fetchNext();
}
}
- public boolean isClosed() {
- return split.isAlive() == false;
- }
-
@Override
public int compareTo(MergeSlot o) {
- long cuboidComp = this.cuboidId - o.cuboidId;
+ long cuboidComp = this.currentCuboidId - o.currentCuboidId;
if (cuboidComp != 0)
return cuboidComp < 0 ? -1 : 1;
// note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
- ImmutableBitSet pk = this.record.getInfo().getPrimaryKey();
+ ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
for (int i = 0; i < pk.trueBitCount(); i++) {
int c = pk.trueBitAt(i);
- int comp = this.record.get(c).compareTo(o.record.get(c));
+ int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
if (comp != 0)
return comp;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 8d0b0fb..3c3d834 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,21 +16,16 @@
*/
package org.apache.kylin.job.inmemcubing;
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.io.DoubleWritable;
@@ -57,7 +52,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.GridTable;
import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.IGTStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +67,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
private static final LongWritable ONE = new LongWritable(1l);
- private final long baseCuboidId;
private final CuboidScheduler cuboidScheduler;
+ private final long baseCuboidId;
+ private final int totalCuboidCount;
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
@@ -88,15 +83,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private TreeSet<CuboidTask> taskPending;
private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
- private OutputThread outputThread;
- private int outputCuboidExpected;
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
+ private ICuboidCollector resultCollector;
public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
super(cubeDesc, dictionaryMap);
this.cuboidScheduler = new CuboidScheduler(cubeDesc);
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ this.totalCuboidCount = cuboidScheduler.getCuboidCount();
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
@@ -154,6 +149,46 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
@Override
public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ TreeMap<Long, CuboidResult> result = build(input);
+ for (CuboidResult cuboidResult : result.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ }
+
+ TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+ final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+ ICuboidCollector collector = new ICuboidCollector() {
+ @Override
+ public void collect(CuboidResult cuboidResult) {
+ result.put(cuboidResult.cuboidId, cuboidResult);
+ }
+ };
+ build(input, collector);
+ return result;
+ }
+
+ static interface ICuboidCollector {
+ public void collect(CuboidResult result);
+ }
+
+ static class CuboidResult {
+ public long cuboidId;
+ public GridTable table;
+ public int nRows;
+ public long timeSpent;
+ public int aggrCacheMB;
+
+ public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ this.cuboidId = cuboidId;
+ this.table = table;
+ this.nRows = nRows;
+ this.timeSpent = timeSpent;
+ this.aggrCacheMB = aggrCacheMB;
+ }
+ }
+
+ private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
@@ -163,14 +198,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
taskThreads = prepareTaskThreads();
taskThreadExceptions = new Throwable[taskThreadCount];
- // output goes in a separate thread to leverage any async-ness
- outputThread = new OutputThread(output);
- outputCuboidExpected = outputThread.getOutputCuboidExpected();
-
// build base cuboid
+ resultCollector = collector;
totalSumForSanityCheck = null;
baseResult = createBaseCuboid(input);
- taskCuboidCompleted.incrementAndGet();
if (baseResult.nRows == 0)
return;
@@ -180,11 +211,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
// kick off N-D cuboid tasks and output
addChildTasks(baseResult);
start(taskThreads);
- start(outputThread);
// wait complete
join(taskThreads);
- join(outputThread);
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
@@ -194,7 +223,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
public void abort() {
interrupt(taskThreads);
- interrupt(outputThread);
}
private void start(Thread... threads) {
@@ -223,9 +251,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (t != null)
errors.add(t);
}
- if (outputThread.getException() != null) {
- errors.add(outputThread.getException());
- }
if (errors.isEmpty()) {
return;
} else if (errors.size() == 1) {
@@ -250,7 +275,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
public boolean isAllCuboidDone() {
- return taskCuboidCompleted.get() == outputCuboidExpected;
+ return taskCuboidCompleted.get() == totalCuboidCount;
}
private class CuboidTaskThread extends Thread {
@@ -280,8 +305,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
addChildTasks(newCuboid);
- task.parent.markOneSpanningDone();
- taskCuboidCompleted.incrementAndGet();
if (isAllCuboidDone()) {
for (Thread t : taskThreads) {
@@ -387,8 +410,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (aggrCacheMB <= 0) {
aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
}
+
CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
- outputThread.addOutput(result);
+ taskCuboidCompleted.incrementAndGet();
+
+ resultCollector.collect(result);
return result;
}
@@ -397,7 +423,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
@Override
public int freeUp(int mb) {
- return 0; // cannot free up
+ return 0; // cannot free up on demand
}
@Override
@@ -493,17 +519,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private void closeStore(GridTable gt) {
- IGTStore store = gt.getStore();
- if (store instanceof Closeable) {
- try {
- ((Closeable) store).close();
- } catch (IOException e) {
- logger.warn("Close " + store + " exception", e);
- }
- }
- }
-
// ===========================================================================
private static class CuboidTask implements Comparable<CuboidTask> {
@@ -522,162 +537,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private class CuboidResult {
- long cuboidId;
- GridTable table;
- int nRows;
- @SuppressWarnings("unused")
- long timeSpent;
- int aggrCacheMB;
- boolean outputDone;
- int spanningDone;
-
- public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
- this.cuboidId = cuboidId;
- this.table = table;
- this.nRows = nRows;
- this.timeSpent = timeSpent;
- this.aggrCacheMB = aggrCacheMB;
- }
-
- synchronized void markOutputDone() {
- outputDone = true;
- closeIfAllDone();
- }
-
- synchronized void markOneSpanningDone() {
- spanningDone++;
- closeIfAllDone();
- }
-
- private void closeIfAllDone() {
- if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
- closeStore(table);
- }
- }
- }
-
- // ============================================================================
-
- private class OutputThread extends Thread {
- private ICuboidWriter output;
- private SortedMap<Long, Long> outputSequence; // synchronized sorted map
- private LinkedBlockingDeque<CuboidResult> outputPending;
- private int outputCount;
- private int outputCuboidExpected;
- private Throwable outputThreadException;
-
- OutputThread(ICuboidWriter output) {
- super("CuboidOutput");
- this.output = output;
- this.outputSequence = prepareOutputSequence();
- this.outputPending = new LinkedBlockingDeque<CuboidResult>();
- this.outputCount = 0;
- this.outputCuboidExpected = outputSequence.size();
-
- if (outputOrderRequired == false)
- outputSequence = null;
- }
-
- public int getOutputCuboidExpected() {
- return outputCuboidExpected;
- }
-
- private SortedMap<Long, Long> prepareOutputSequence() {
- TreeMap<Long, Long> result = new TreeMap<Long, Long>();
- prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
- return Collections.synchronizedSortedMap(result);
- }
-
- private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
- result.put(cuboidId, cuboidId);
- for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
- prepareOutputPendingRecursive(child, result);
- }
- }
-
- public void addOutput(CuboidResult result) {
- // if output is NOT ordered
- if (outputSequence == null) {
- outputPending.addLast(result);
- }
- // if output is ordered
- else {
- Long cuboidId = outputSequence.get(result.cuboidId);
- synchronized (cuboidId) {
- outputPending.addFirst(result);
- cuboidId.notify();
- }
- }
- }
-
- private CuboidResult nextOutput() throws InterruptedException {
- CuboidResult result = null;
-
- // if output is NOT ordered
- if (outputSequence == null) {
- while (result == null && taskHasNoException()) {
- result = outputPending.pollFirst(60, TimeUnit.SECONDS);
- }
- }
- // if output is ordered
- else {
- Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
- synchronized (nextCuboidId) {
- while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
- nextCuboidId.wait(60000);
- }
- }
- outputSequence.remove(result.cuboidId);
- }
-
- return result;
- }
-
- @Override
- public void run() {
- try {
- while (outputCount < outputCuboidExpected) {
- CuboidResult result = nextOutput();
-
- // if task error occurs
- if (result == null || result.table == null)
- break;
-
- outputCuboid(result.cuboidId, result.table);
- outputCount++;
- result.markOutputDone();
- }
- } catch (Throwable ex) {
- logger.error("output thread exception", ex);
- outputThreadException = ex;
- }
- }
-
- private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- output.write(cuboidId, record);
- }
- scanner.close();
- logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
- }
-
- private CuboidResult findPendingOutput(Long cuboidId) {
- for (CuboidResult r : outputPending) {
- if (r.cuboidId == cuboidId)
- return r;
- }
- return null;
- }
-
- public Throwable getException() {
- return outputThreadException;
- }
- }
-
// ============================================================================
private class InputConverter implements IGTScanner {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
new file mode 100644
index 0000000..d5563b7
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
+
+ // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
+ private static final int INPUT_ROWS = 200000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ long randSeed = System.currentTimeMillis();
+
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ doggedBuilder.setConcurrentThreads(THREADS);
+
+ {
+ Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
+ InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ }
+ }
+
+ class NoopWriter implements ICuboidWriter {
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
index 5c19df3..a87f950 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
@@ -82,10 +82,9 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- long randSeed = 101;
+ long randSeed = System.currentTimeMillis();
DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- doggedBuilder.setOutputOrder(true);
doggedBuilder.setConcurrentThreads(THREADS);
doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
FileRecordWriter doggedResult = new FileRecordWriter();
@@ -98,7 +97,6 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
}
InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- inmemBuilder.setOutputOrder(true);
inmemBuilder.setConcurrentThreads(THREADS);
FileRecordWriter inmemResult = new FileRecordWriter();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 2a4cf8a..9600ef7 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -85,7 +85,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
public void test() throws Exception {
InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- cubeBuilder.setOutputOrder(true);
+ //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
cubeBuilder.setConcurrentThreads(THREADS);
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index 20b543a..092227b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -1,8 +1,9 @@
package org.apache.kylin.storage.gridtable;
+import java.io.Closeable;
import java.io.IOException;
-public class GridTable {
+public class GridTable implements Closeable {
final GTInfo info;
final IGTStore store;
@@ -50,4 +51,11 @@ public class GridTable {
public IGTStore getStore() {
return store;
}
+
+ @Override
+ public void close() throws IOException {
+ if (store instanceof Closeable) {
+ ((Closeable) store).close();
+ }
+ }
}