You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/24 04:39:53 UTC
incubator-kylin git commit: KYLIN-803 half way
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-803 [created] 0d5398853
KYLIN-803 half way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0d539885
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0d539885
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0d539885
Branch: refs/heads/KYLIN-803
Commit: 0d539885343b3a2b7f64e2bd9787cb47deb8f134
Parents: d020169
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 24 10:39:23 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jun 24 10:39:23 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/cuboid/CuboidScheduler.java | 14 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 64 +++++-
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 206 ++-----------------
.../kylin/storage/gridtable/GridTable.java | 10 +-
4 files changed, 96 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 07be092..bebfd08 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -19,8 +19,6 @@
package org.apache.kylin.cube.cuboid;
/**
- * @author George Song (ysong1)
- *
*/
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +40,18 @@ public class CuboidScheduler {
this.max = (long) Math.pow(2, size) - 1;
this.cache = new ConcurrentHashMap<Long, List<Long>>();
}
+
+ public int getCuboidCount() {
+ return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+ }
+
+ private int getCuboidCount(long cuboidId) {
+ int r = 1;
+ for (Long child : getSpanningCuboid(cuboidId)) {
+ r += getCuboidCount(child);
+ }
+ return r;
+ }
public List<Long> getSpanningCuboid(long cuboid) {
if (cuboid > max || cuboid < 0) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
index 034c4cd..cf7c356 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -19,17 +19,26 @@ package org.apache.kylin.job.inmemcubing;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An interface alike abstract class. Hold common tunable parameters and nothing more.
*/
abstract public class AbstractInMemCubeBuilder {
+ private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder .class);
+
final protected CubeDesc cubeDesc;
final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
@@ -72,5 +81,58 @@ abstract public class AbstractInMemCubeBuilder {
};
}
- abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ TreeMap<Long, CuboidResult> result = build(input);
+ for (CuboidResult cuboidResult : result.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ }
+
+ private void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ output.write(cuboidId, record);
+ }
+ scanner.close();
+ logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+ public TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+ final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+ ICuboidCollector collector = new ICuboidCollector() {
+ @Override
+ public void collect(CuboidResult cuboidResult) {
+ result.put(cuboidResult.cuboidId, cuboidResult);
+ }
+ };
+ build(input, collector);
+ return result;
+ }
+
+ abstract public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException;
+
+ public static interface ICuboidCollector {
+ public void collect(CuboidResult result);
+ }
+
+ public static class CuboidResult {
+ public long cuboidId;
+ public GridTable table;
+ public int nRows;
+ public long timeSpent;
+ public int aggrCacheMB;
+
+ public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ this.cuboidId = cuboidId;
+ this.table = table;
+ this.nRows = nRows;
+ this.timeSpent = timeSpent;
+ this.aggrCacheMB = aggrCacheMB;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 8d0b0fb..996e747 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,21 +16,15 @@
*/
package org.apache.kylin.job.inmemcubing;
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.io.DoubleWritable;
@@ -57,7 +51,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.GridTable;
import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.IGTStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +66,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
private static final LongWritable ONE = new LongWritable(1l);
- private final long baseCuboidId;
private final CuboidScheduler cuboidScheduler;
+ private final long baseCuboidId;
+ private final int totalCuboidCount;
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
@@ -88,15 +82,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private TreeSet<CuboidTask> taskPending;
private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
- private OutputThread outputThread;
- private int outputCuboidExpected;
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
+ private ICuboidCollector resultCollector;
public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
super(cubeDesc, dictionaryMap);
this.cuboidScheduler = new CuboidScheduler(cubeDesc);
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ this.totalCuboidCount = cuboidScheduler.getCuboidCount();
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
@@ -153,7 +147,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
@Override
- public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
@@ -163,14 +157,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
taskThreads = prepareTaskThreads();
taskThreadExceptions = new Throwable[taskThreadCount];
- // output goes in a separate thread to leverage any async-ness
- outputThread = new OutputThread(output);
- outputCuboidExpected = outputThread.getOutputCuboidExpected();
-
// build base cuboid
+ resultCollector = collector;
totalSumForSanityCheck = null;
baseResult = createBaseCuboid(input);
- taskCuboidCompleted.incrementAndGet();
if (baseResult.nRows == 0)
return;
@@ -180,11 +170,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
// kick off N-D cuboid tasks and output
addChildTasks(baseResult);
start(taskThreads);
- start(outputThread);
// wait complete
join(taskThreads);
- join(outputThread);
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
@@ -194,7 +182,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
public void abort() {
interrupt(taskThreads);
- interrupt(outputThread);
}
private void start(Thread... threads) {
@@ -223,9 +210,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (t != null)
errors.add(t);
}
- if (outputThread.getException() != null) {
- errors.add(outputThread.getException());
- }
if (errors.isEmpty()) {
return;
} else if (errors.size() == 1) {
@@ -250,7 +234,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
public boolean isAllCuboidDone() {
- return taskCuboidCompleted.get() == outputCuboidExpected;
+ return taskCuboidCompleted.get() == totalCuboidCount;
}
private class CuboidTaskThread extends Thread {
@@ -280,8 +264,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
addChildTasks(newCuboid);
- task.parent.markOneSpanningDone();
- taskCuboidCompleted.incrementAndGet();
if (isAllCuboidDone()) {
for (Thread t : taskThreads) {
@@ -387,8 +369,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (aggrCacheMB <= 0) {
aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
}
+
CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
- outputThread.addOutput(result);
+ taskCuboidCompleted.incrementAndGet();
+
+ resultCollector.collect(result);
return result;
}
@@ -397,7 +382,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
@Override
public int freeUp(int mb) {
- return 0; // cannot free up
+ return 0; // cannot free up on demand
}
@Override
@@ -493,17 +478,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private void closeStore(GridTable gt) {
- IGTStore store = gt.getStore();
- if (store instanceof Closeable) {
- try {
- ((Closeable) store).close();
- } catch (IOException e) {
- logger.warn("Close " + store + " exception", e);
- }
- }
- }
-
// ===========================================================================
private static class CuboidTask implements Comparable<CuboidTask> {
@@ -522,162 +496,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- private class CuboidResult {
- long cuboidId;
- GridTable table;
- int nRows;
- @SuppressWarnings("unused")
- long timeSpent;
- int aggrCacheMB;
- boolean outputDone;
- int spanningDone;
-
- public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
- this.cuboidId = cuboidId;
- this.table = table;
- this.nRows = nRows;
- this.timeSpent = timeSpent;
- this.aggrCacheMB = aggrCacheMB;
- }
-
- synchronized void markOutputDone() {
- outputDone = true;
- closeIfAllDone();
- }
-
- synchronized void markOneSpanningDone() {
- spanningDone++;
- closeIfAllDone();
- }
-
- private void closeIfAllDone() {
- if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
- closeStore(table);
- }
- }
- }
-
- // ============================================================================
-
- private class OutputThread extends Thread {
- private ICuboidWriter output;
- private SortedMap<Long, Long> outputSequence; // synchronized sorted map
- private LinkedBlockingDeque<CuboidResult> outputPending;
- private int outputCount;
- private int outputCuboidExpected;
- private Throwable outputThreadException;
-
- OutputThread(ICuboidWriter output) {
- super("CuboidOutput");
- this.output = output;
- this.outputSequence = prepareOutputSequence();
- this.outputPending = new LinkedBlockingDeque<CuboidResult>();
- this.outputCount = 0;
- this.outputCuboidExpected = outputSequence.size();
-
- if (outputOrderRequired == false)
- outputSequence = null;
- }
-
- public int getOutputCuboidExpected() {
- return outputCuboidExpected;
- }
-
- private SortedMap<Long, Long> prepareOutputSequence() {
- TreeMap<Long, Long> result = new TreeMap<Long, Long>();
- prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
- return Collections.synchronizedSortedMap(result);
- }
-
- private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
- result.put(cuboidId, cuboidId);
- for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
- prepareOutputPendingRecursive(child, result);
- }
- }
-
- public void addOutput(CuboidResult result) {
- // if output is NOT ordered
- if (outputSequence == null) {
- outputPending.addLast(result);
- }
- // if output is ordered
- else {
- Long cuboidId = outputSequence.get(result.cuboidId);
- synchronized (cuboidId) {
- outputPending.addFirst(result);
- cuboidId.notify();
- }
- }
- }
-
- private CuboidResult nextOutput() throws InterruptedException {
- CuboidResult result = null;
-
- // if output is NOT ordered
- if (outputSequence == null) {
- while (result == null && taskHasNoException()) {
- result = outputPending.pollFirst(60, TimeUnit.SECONDS);
- }
- }
- // if output is ordered
- else {
- Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
- synchronized (nextCuboidId) {
- while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
- nextCuboidId.wait(60000);
- }
- }
- outputSequence.remove(result.cuboidId);
- }
-
- return result;
- }
-
- @Override
- public void run() {
- try {
- while (outputCount < outputCuboidExpected) {
- CuboidResult result = nextOutput();
-
- // if task error occurs
- if (result == null || result.table == null)
- break;
-
- outputCuboid(result.cuboidId, result.table);
- outputCount++;
- result.markOutputDone();
- }
- } catch (Throwable ex) {
- logger.error("output thread exception", ex);
- outputThreadException = ex;
- }
- }
-
- private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- output.write(cuboidId, record);
- }
- scanner.close();
- logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
- }
-
- private CuboidResult findPendingOutput(Long cuboidId) {
- for (CuboidResult r : outputPending) {
- if (r.cuboidId == cuboidId)
- return r;
- }
- return null;
- }
-
- public Throwable getException() {
- return outputThreadException;
- }
- }
-
// ============================================================================
private class InputConverter implements IGTScanner {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index 20b543a..092227b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -1,8 +1,9 @@
package org.apache.kylin.storage.gridtable;
+import java.io.Closeable;
import java.io.IOException;
-public class GridTable {
+public class GridTable implements Closeable {
final GTInfo info;
final IGTStore store;
@@ -50,4 +51,11 @@ public class GridTable {
public IGTStore getStore() {
return store;
}
+
+ @Override
+ public void close() throws IOException {
+ if (store instanceof Closeable) {
+ ((Closeable) store).close();
+ }
+ }
}