You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/24 12:24:44 UTC
[2/2] incubator-kylin git commit: KYLIN-803 seems all good
KYLIN-803 seems all good
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3c0677c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3c0677c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3c0677c5
Branch: refs/heads/KYLIN-803
Commit: 3c0677c5b2e9fddd4138bb61f869b243cc64c7f5
Parents: ba9dfed
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Jun 24 18:24:03 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jun 24 18:24:03 2015 +0800
----------------------------------------------------------------------
.../inmemcubing/AbstractInMemCubeBuilder.java | 55 +----
.../job/inmemcubing/DoggedCubeBuilder.java | 233 ++++++++++---------
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 43 +++-
.../job/inmemcubing/DoggedCubeBuilderTest.java | 4 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 2 +-
5 files changed, 166 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3c0677c5/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
index cf7c356..b05ad32 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -19,7 +19,6 @@ package org.apache.kylin.job.inmemcubing;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import org.apache.kylin.cube.model.CubeDesc;
@@ -37,8 +36,8 @@ import org.slf4j.LoggerFactory;
*/
abstract public class AbstractInMemCubeBuilder {
- private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder .class);
-
+ private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
+
final protected CubeDesc cubeDesc;
final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
@@ -60,10 +59,6 @@ abstract public class AbstractInMemCubeBuilder {
this.taskThreadCount = n;
}
- public void setOutputOrder(boolean required) {
- this.outputOrderRequired = required;
- }
-
public void setReserveMemoryMB(int mb) {
this.reserveMemoryMB = mb;
}
@@ -81,15 +76,9 @@ abstract public class AbstractInMemCubeBuilder {
};
}
- public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
- TreeMap<Long, CuboidResult> result = build(input);
- for (CuboidResult cuboidResult : result.values()) {
- outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
- cuboidResult.table.close();
- }
- }
-
- private void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+ abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+
+ protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
IGTScanner scanner = gridTable.scan(req);
@@ -100,39 +89,5 @@ abstract public class AbstractInMemCubeBuilder {
logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
}
- public TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
- final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
- ICuboidCollector collector = new ICuboidCollector() {
- @Override
- public void collect(CuboidResult cuboidResult) {
- result.put(cuboidResult.cuboidId, cuboidResult);
- }
- };
- build(input, collector);
- return result;
- }
-
- abstract public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException;
-
- public static interface ICuboidCollector {
- public void collect(CuboidResult result);
- }
-
- public static class CuboidResult {
- public long cuboidId;
- public GridTable table;
- public int nRows;
- public long timeSpent;
- public int aggrCacheMB;
-
- public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
- this.cuboidId = cuboidId;
- this.table = table;
- this.nRows = nRows;
- this.timeSpent = timeSpent;
- this.aggrCacheMB = aggrCacheMB;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3c0677c5/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index dc7c695..86fb75d 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -19,10 +19,12 @@ package org.apache.kylin.job.inmemcubing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -31,11 +33,14 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
import org.apache.kylin.metadata.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,34 +72,58 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private class BuildOnce {
- final List<SplitThread> splits = new ArrayList<SplitThread>();
- final Merger merger = new Merger();
-
public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
- SplitThread last = null;
- boolean eof = false;
+ final List<SplitThread> splits = new ArrayList<SplitThread>();
+ final Merger merger = new Merger();
+
+ try {
+ SplitThread last = null;
+ boolean eof = false;
+
+ while (!eof) {
+
+ if (last != null && shouldCutSplit(splits)) {
+ cutSplit(last);
+ last = null;
+ }
+
+ checkException(splits);
- while (!eof) {
+ if (last == null) {
+ last = new SplitThread();
+ splits.add(last);
+ last.start();
+ }
- if (last != null && shouldCutSplit()) {
- cutSplit(last);
- last = null;
+ eof = feedSomeInput(input, last, unitRows);
}
+ for (SplitThread split : splits) {
+ split.join();
+ }
checkException(splits);
- if (last == null) {
- last = new SplitThread(merger);
- splits.add(last);
- last.start();
- }
+ merger.mergeAndOutput(splits, output);
- eof = feedSomeInput(input, last, unitRows);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ closeGirdTables(splits);
}
+ }
- merger.mergeAndOutput(splits, output);
-
- checkException(splits);
+ private void closeGirdTables(List<SplitThread> splits) {
+ for (SplitThread split : splits) {
+ if (split.buildResult != null) {
+ for (CuboidResult r : split.buildResult.values()) {
+ try {
+ r.table.close();
+ } catch (Throwable e) {
+ logger.error("Error closing grid table " + r.table, e);
+ }
+ }
+ }
+ }
}
private void checkException(List<SplitThread> splits) throws IOException {
@@ -171,7 +200,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
Thread.sleep(1000);
}
- // wait cuboid build done (but still pending output)
+ // wait cuboid build done
while (last.isAlive()) {
if (last.builder.isAllCuboidDone()) {
break;
@@ -183,7 +212,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private boolean shouldCutSplit() {
+ private boolean shouldCutSplit(List<SplitThread> splits) {
int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
int nSplit = splits.size();
long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
@@ -197,24 +226,21 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private class SplitThread extends Thread {
final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
final InMemCubeBuilder builder;
- final MergeSlot output;
+ TreeMap<Long, CuboidResult> buildResult;
long inputRowCount = 0;
RuntimeException exception;
- public SplitThread(Merger merger) {
+ public SplitThread() {
this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
this.builder.setConcurrentThreads(taskThreadCount);
- this.builder.setOutputOrder(true); // merge sort requires order
this.builder.setReserveMemoryMB(reserveMemoryMB);
-
- this.output = merger.newMergeSlot(this);
}
@Override
public void run() {
try {
- builder.build(inputQueue, output);
+ buildResult = builder.build(inputQueue);
} catch (Exception e) {
if (e instanceof RuntimeException)
this.exception = (RuntimeException) e;
@@ -239,74 +265,55 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
reuseMetricsArray = new Object[measures.length];
}
- public MergeSlot newMergeSlot(SplitThread split) {
- return new MergeSlot(split);
- }
-
public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
- LinkedList<MergeSlot> open = Lists.newLinkedList();
- for (SplitThread split : splits)
- open.add(split.output);
-
if (splits.size() == 1) {
- splits.get(0).output.directOutput = output;
- }
-
- try {
- PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
- boolean hasMore = true;
-
- while (hasMore) {
- takeRecordsFromAllOpenSlots(open, heap);
- hasMore = mergeAndOutputOneRecord(heap, open, output);
+ for (CuboidResult cuboidResult : splits.get(0).buildResult.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
}
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ return;
}
- }
- private void takeRecordsFromAllOpenSlots(LinkedList<MergeSlot> open, PriorityQueue<MergeSlot> heap) throws InterruptedException {
- while (!open.isEmpty()) {
- MergeSlot slot = open.getFirst();
- // ready one record in the slot
- if (slot.readySignal.poll(1, TimeUnit.SECONDS) != null) {
- open.removeFirst();
- heap.add(slot);
- } else if (slot.isClosed()) {
- open.removeFirst();
- }
+ LinkedList<MergeSlot> open = Lists.newLinkedList();
+ for (SplitThread split : splits) {
+ open.add(new MergeSlot(split));
}
- return;
- }
- private boolean mergeAndOutputOneRecord(PriorityQueue<MergeSlot> heap, LinkedList<MergeSlot> open, ICuboidWriter output) throws IOException, InterruptedException {
- MergeSlot smallest = heap.poll();
- if (smallest == null)
- return false;
- open.add(smallest);
-
- if (smallest.isSameKey(heap.peek())) {
- Object[] metrics = getMetricsValues(smallest.record);
- reuseAggrs.reset();
- reuseAggrs.aggregate(metrics);
- do {
- MergeSlot slot = heap.poll();
- open.add(slot);
- metrics = getMetricsValues(slot.record);
- reuseAggrs.aggregate(metrics);
- } while (smallest.isSameKey(heap.peek()));
+ PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
- reuseAggrs.collectStates(metrics);
- setMetricsValues(smallest.record, metrics);
- }
+ while (true) {
+ // ready records in open slots and add to heap
+ while (!open.isEmpty()) {
+ MergeSlot slot = open.removeFirst();
+ if (slot.fetchNext()) {
+ heap.add(slot);
+ }
+ }
+
+ // find the smallest on heap
+ MergeSlot smallest = heap.poll();
+ if (smallest == null)
+ break;
+ open.add(smallest);
- output.write(smallest.cuboidId, smallest.record);
+ // merge with slots having the same key
+ if (smallest.isSameKey(heap.peek())) {
+ Object[] metrics = getMetricsValues(smallest.currentRecord);
+ reuseAggrs.reset();
+ reuseAggrs.aggregate(metrics);
+ do {
+ MergeSlot slot = heap.poll();
+ open.add(slot);
+ metrics = getMetricsValues(slot.currentRecord);
+ reuseAggrs.aggregate(metrics);
+ } while (smallest.isSameKey(heap.peek()));
+
+ reuseAggrs.collectStates(metrics);
+ setMetricsValues(smallest.currentRecord, metrics);
+ }
- for (MergeSlot slot : open) {
- slot.consumedSignal.put(this);
+ output.write(smallest.currentCuboidId, smallest.currentRecord);
}
- return true;
}
private void setMetricsValues(GTRecord record, Object[] metricsValues) {
@@ -337,58 +344,52 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private static class MergeSlot implements ICuboidWriter, Comparable<MergeSlot> {
+ private static class MergeSlot implements Comparable<MergeSlot> {
- final SplitThread split;
- final BlockingQueue<Object> readySignal = new ArrayBlockingQueue<Object>(1);
- final BlockingQueue<Object> consumedSignal = new ArrayBlockingQueue<Object>(1);
+ final Iterator<CuboidResult> cuboidIterator;
+ IGTScanner scanner;
+ Iterator<GTRecord> recordIterator;
- ICuboidWriter directOutput = null;
- long cuboidId;
- GTRecord record;
+ long currentCuboidId;
+ GTRecord currentRecord;
public MergeSlot(SplitThread split) {
- this.split = split;
+ cuboidIterator = split.buildResult.values().iterator();
}
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- // when only one split left
- if (directOutput != null) {
- directOutput.write(cuboidId, record);
- return;
+ public boolean fetchNext() throws IOException {
+ if (recordIterator == null) {
+ if (cuboidIterator.hasNext()) {
+ CuboidResult cuboid = cuboidIterator.next();
+ currentCuboidId = cuboid.cuboidId;
+ scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+ recordIterator = scanner.iterator();
+ } else {
+ return false;
+ }
}
- this.cuboidId = cuboidId;
- this.record = record;
-
- try {
- // signal record is ready
- readySignal.put(this);
-
- // wait record be consumed
- consumedSignal.take();
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ if (recordIterator.hasNext()) {
+ currentRecord = recordIterator.next();
+ return true;
+ } else {
+ scanner.close();
+ recordIterator = null;
+ return fetchNext();
}
}
- public boolean isClosed() {
- return split.isAlive() == false;
- }
-
@Override
public int compareTo(MergeSlot o) {
- long cuboidComp = this.cuboidId - o.cuboidId;
+ long cuboidComp = this.currentCuboidId - o.currentCuboidId;
if (cuboidComp != 0)
return cuboidComp < 0 ? -1 : 1;
// note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
- ImmutableBitSet pk = this.record.getInfo().getPrimaryKey();
+ ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
for (int i = 0; i < pk.trueBitCount(); i++) {
int c = pk.trueBitAt(i);
- int comp = this.record.get(c).compareTo(o.record.get(c));
+ int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
if (comp != 0)
return comp;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3c0677c5/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 996e747..3c3d834 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -23,6 +23,7 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -147,7 +148,47 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
@Override
- public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ TreeMap<Long, CuboidResult> result = build(input);
+ for (CuboidResult cuboidResult : result.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ }
+
+ TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+ final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+ ICuboidCollector collector = new ICuboidCollector() {
+ @Override
+ public void collect(CuboidResult cuboidResult) {
+ result.put(cuboidResult.cuboidId, cuboidResult);
+ }
+ };
+ build(input, collector);
+ return result;
+ }
+
+ static interface ICuboidCollector {
+ public void collect(CuboidResult result);
+ }
+
+ static class CuboidResult {
+ public long cuboidId;
+ public GridTable table;
+ public int nRows;
+ public long timeSpent;
+ public int aggrCacheMB;
+
+ public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ this.cuboidId = cuboidId;
+ this.table = table;
+ this.nRows = nRows;
+ this.timeSpent = timeSpent;
+ this.aggrCacheMB = aggrCacheMB;
+ }
+ }
+
+ private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3c0677c5/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
index 5c19df3..a87f950 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
@@ -82,10 +82,9 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- long randSeed = 101;
+ long randSeed = System.currentTimeMillis();
DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- doggedBuilder.setOutputOrder(true);
doggedBuilder.setConcurrentThreads(THREADS);
doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
FileRecordWriter doggedResult = new FileRecordWriter();
@@ -98,7 +97,6 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
}
InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- inmemBuilder.setOutputOrder(true);
inmemBuilder.setConcurrentThreads(THREADS);
FileRecordWriter inmemResult = new FileRecordWriter();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3c0677c5/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 2a4cf8a..9600ef7 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -85,7 +85,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
public void test() throws Exception {
InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- cubeBuilder.setOutputOrder(true);
+ //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
cubeBuilder.setConcurrentThreads(THREADS);
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);