You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/05 09:16:36 UTC
incubator-kylin git commit: minor,
set InMemCubeBuilder don't enforce output order by default
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 4f7b1bffa -> d1a2682a5
minor, set InMemCubeBuilder don't enforce output order by default
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1a2682a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1a2682a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1a2682a
Branch: refs/heads/0.8.0
Commit: d1a2682a520207ac4a95ccafe8fafdb73827f8c8
Parents: 4f7b1bf
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jun 5 15:15:49 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jun 5 15:16:21 2015 +0800
----------------------------------------------------------------------
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 286 +++++++++++++------
1 file changed, 194 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1a2682a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 650fdb3..5833b0f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,12 +16,32 @@
*/
package org.apache.kylin.job.inmemcubing;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.*;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
@@ -34,16 +54,19 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.IGTStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
*/
@@ -73,13 +96,11 @@ public class InMemCubeBuilder implements Runnable {
private Throwable[] taskThreadExceptions;
private TreeSet<CuboidTask> taskPending;
private AtomicInteger taskCuboidCompleted;
- private CuboidResult baseResult;
- private SortedMap<Long, CuboidResult> outputPending;
- private Thread outputThread;
- private Throwable outputThreadException;
+ private OutputThread outputThread;
private int outputCuboidExpected;
-
+ private boolean outputOrderRequired;
+ private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
@@ -140,6 +161,10 @@ public class InMemCubeBuilder implements Runnable {
this.taskThreadCount = n;
}
+ public void setOutputOrder(boolean required) {
+ this.outputOrderRequired = required;
+ }
+
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
@@ -183,10 +208,8 @@ public class InMemCubeBuilder implements Runnable {
taskThreadExceptions = new Throwable[taskThreadCount];
// output goes in a separate thread to leverage any async-ness
- outputPending = prepareOutputPending();
- outputCuboidExpected = outputPending.size();
- outputThread = prepareOutputThread();
- outputThreadException = null;
+ outputThread = new OutputThread();
+ outputCuboidExpected = outputThread.getOutputCuboidExpected();
// build base cuboid
baseResult = createBaseCuboid();
@@ -233,8 +256,8 @@ public class InMemCubeBuilder implements Runnable {
if (t != null)
errors.add(t);
}
- if (outputThreadException != null) {
- errors.add(outputThreadException);
+ if (outputThread.getException() != null) {
+ errors.add(outputThread.getException());
}
if (errors.isEmpty()) {
return;
@@ -286,6 +309,7 @@ public class InMemCubeBuilder implements Runnable {
CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
addChildTasks(newCuboid);
+ task.parent.markOneSpanningDone();
taskCuboidCompleted.incrementAndGet();
if (taskCuboidCompleted.get() == outputCuboidExpected) {
@@ -323,50 +347,6 @@ public class InMemCubeBuilder implements Runnable {
}
}
- private SortedMap<Long, CuboidResult> prepareOutputPending() {
- TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
- prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
- return Collections.synchronizedSortedMap(result);
- }
-
- private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, CuboidResult> result) {
- result.put(cuboidId, new CuboidResult(cuboidId, null, 0, 0, 0));
- for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
- prepareOutputPendingRecursive(child, result);
- }
- }
-
- private Thread prepareOutputThread() {
- return new Thread("CuboidOutput") {
- public void run() {
- try {
- while (!outputPending.isEmpty()) {
- CuboidResult result = outputPending.get(outputPending.firstKey());
- synchronized (result) {
- while (result.table == null && taskHasNoException()) {
- try {
- result.wait(60000);
- } catch (InterruptedException e) {
- logger.error("interrupted", e);
- }
- }
- }
-
- // if task error occurs
- if (result.table == null)
- break;
-
- outputCuboid(result.cuboidId, result.table);
- outputPending.remove(result.cuboidId);
- }
- } catch (Throwable ex) {
- logger.error("output thread exception", ex);
- outputThreadException = ex;
- }
- }
- };
- }
-
private int getSystemAvailMB() {
Runtime.getRuntime().gc();
try {
@@ -434,18 +414,12 @@ public class InMemCubeBuilder implements Runnable {
return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
}
- private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int mbBaseAggrCache) {
- CuboidResult result = outputPending.get(cuboidId);
- result.table = table;
- result.nRows = nRows;
- result.timeSpent = timeSpent;
- result.aggrCacheMB = mbBaseAggrCache;
- if (result.aggrCacheMB <= 0) {
- result.aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
- }
- synchronized (result) {
- result.notify();
+ private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ if (aggrCacheMB <= 0) {
+ aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
}
+ CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+ outputThread.addOutput(result);
return result;
}
@@ -579,23 +553,14 @@ public class InMemCubeBuilder implements Runnable {
}
}
- private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- this.outputWriter.write(cuboidId, record);
- }
- scanner.close();
- logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-
- closeStore(gridTable);
- }
-
- private void closeStore(GridTable gt) throws IOException {
+ private void closeStore(GridTable gt) {
IGTStore store = gt.getStore();
if (store instanceof Closeable) {
- ((Closeable) store).close();
+ try {
+ ((Closeable) store).close();
+ } catch (IOException e) {
+ logger.warn("Close " + store + " exception", e);
+ }
}
}
@@ -617,13 +582,15 @@ public class InMemCubeBuilder implements Runnable {
}
}
- private static class CuboidResult {
+ private class CuboidResult {
long cuboidId;
GridTable table;
int nRows;
@SuppressWarnings("unused")
long timeSpent;
int aggrCacheMB;
+ boolean outputDone;
+ int spanningDone;
public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
this.cuboidId = cuboidId;
@@ -632,6 +599,141 @@ public class InMemCubeBuilder implements Runnable {
this.timeSpent = timeSpent;
this.aggrCacheMB = aggrCacheMB;
}
+
+ synchronized void markOutputDone() {
+ outputDone = true;
+ closeIfAllDone();
+ }
+
+ synchronized void markOneSpanningDone() {
+ spanningDone++;
+ closeIfAllDone();
+ }
+
+ private void closeIfAllDone() {
+ if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
+ closeStore(table);
+ }
+ }
+ }
+
+ // ============================================================================
+
+ private class OutputThread extends Thread {
+ private SortedMap<Long, Long> outputSequence; // synchronized sorted map
+ private LinkedBlockingDeque<CuboidResult> outputPending;
+ private int outputCount;
+ private int outputCuboidExpected;
+ private Throwable outputThreadException;
+
+ OutputThread() {
+ super("CuboidOutput");
+ this.outputSequence = prepareOutputSequence();
+ this.outputPending = new LinkedBlockingDeque<CuboidResult>();
+ this.outputCount = 0;
+ this.outputCuboidExpected = outputSequence.size();
+
+ if (outputOrderRequired == false)
+ outputSequence = null;
+ }
+
+ public int getOutputCuboidExpected() {
+ return outputCuboidExpected;
+ }
+
+ private SortedMap<Long, Long> prepareOutputSequence() {
+ TreeMap<Long, Long> result = new TreeMap<Long, Long>();
+ prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
+ return Collections.synchronizedSortedMap(result);
+ }
+
+ private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
+ result.put(cuboidId, cuboidId);
+ for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
+ prepareOutputPendingRecursive(child, result);
+ }
+ }
+
+ public void addOutput(CuboidResult result) {
+ // if output is NOT ordered
+ if (outputSequence == null) {
+ outputPending.addLast(result);
+ }
+ // if output is ordered
+ else {
+ Long cuboidId = outputSequence.get(result.cuboidId);
+ synchronized (cuboidId) {
+ outputPending.addFirst(result);
+ cuboidId.notify();
+ }
+ }
+ }
+
+ private CuboidResult nextOutput() throws InterruptedException {
+ CuboidResult result = null;
+
+ // if output is NOT ordered
+ if (outputSequence == null) {
+ while (result == null && taskHasNoException()) {
+ result = outputPending.pollFirst(60, TimeUnit.SECONDS);
+ }
+ }
+ // if output is ordered
+ else {
+ Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
+ synchronized (nextCuboidId) {
+ while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
+ nextCuboidId.wait(60000);
+ }
+ }
+ outputSequence.remove(result.cuboidId);
+ }
+
+ return result;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (outputCount < outputCuboidExpected) {
+ CuboidResult result = nextOutput();
+
+ // if task error occurs
+ if (result == null || result.table == null)
+ break;
+
+ outputCuboid(result.cuboidId, result.table);
+ outputCount++;
+ result.markOutputDone();
+ }
+ } catch (Throwable ex) {
+ logger.error("output thread exception", ex);
+ outputThreadException = ex;
+ }
+ }
+
+ private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ outputWriter.write(cuboidId, record);
+ }
+ scanner.close();
+ logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+ private CuboidResult findPendingOutput(Long cuboidId) {
+ for (CuboidResult r : outputPending) {
+ if (r.cuboidId == cuboidId)
+ return r;
+ }
+ return null;
+ }
+
+ public Throwable getException() {
+ return outputThreadException;
+ }
}
// ============================================================================